Skip to content

Commit

Permalink
Failed attempt at getting camera to work on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
jackcannon committed Mar 22, 2020
1 parent a7ebcf9 commit e181847
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 85 deletions.
31 changes: 31 additions & 0 deletions src/cameraHelper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import raspberryPiCamera from "raspberry-pi-camera-native";
import { BehaviorSubject } from "rxjs";
import { first } from "rxjs/operators";
import { IFacePoint } from "./interfaces";
import { cameraOptions } from "./config";

let framesSubject: BehaviorSubject<Buffer> = new BehaviorSubject<Buffer>(null);

export const getFrames = async (): Promise<BehaviorSubject<Buffer>> => {
await startCamera();
await runCamera();
return framesSubject;
};

const startCamera = (): Promise<any> => {
console.log("cameraHelper - startCamera - A");
return new Promise(resolve => {
console.log("cameraHelper - startCamera - B");
raspberryPiCamera.start(cameraOptions, resolve);
});
};

const runCamera = (): Promise<any> => {
console.log("cameraHelper - runCamera - A");
raspberryPiCamera.on("frame", (buffer: Buffer) => {
console.log("cameraHelper - runCamera - B");
framesSubject.next(buffer);
});
console.log("cameraHelper - runCamera - C");
return framesSubject.pipe(first(frame => !!frame)).toPromise();
};
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// ----------------------------

export const showDashboard = true;
export const showDashboard = false;

// ----------------------------
//
Expand Down
58 changes: 8 additions & 50 deletions src/camera.ts → src/detection.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,50 @@
import raspberryPiCamera from "raspberry-pi-camera-native";
import { createTimer, toFixed } from "./utils";
import { BehaviorSubject } from "rxjs";
import { filter, first, delay } from "rxjs/operators";
import { filter } from "rxjs/operators";
import { IFacePoint } from "./interfaces";
import { Worker, isMainThread, parentPort, workerData } from "worker_threads";
import { cameraOptions } from "./config";
import { log, addFaceDetectionTime, updateFaces } from "./dashboard";

let pointsSubject: BehaviorSubject<IFacePoint[]> = new BehaviorSubject<
IFacePoint[]
>(null);
let framesSubject: BehaviorSubject<Buffer> = new BehaviorSubject<Buffer>(null);

let worker: Worker;
let workerMsgs: BehaviorSubject<any> = new BehaviorSubject<any>(null);

// const workerPath = "./dist/worker-faceapi.js";
const workerPath = "./dist/worker-opencv.js";

const sentTimes = [];

export const setup = async (): Promise<BehaviorSubject<IFacePoint[]>> => {
await createWorker();
await startCamera();
await runCamera();
startListening();
runProcess();

return pointsSubject;
};

const createWorker = (): Promise<any> => {
console.log("detection - createWorker - A");
return new Promise(resolve => {
console.log("detection - createWorker - B");
worker = new Worker(workerPath, {});
worker.on("message", data => {
console.log("detection - createWorker - C");
workerMsgs.next(data);
if (data && data.type && data.type === "init") {
// log.log(worker);
console.log("detection - createWorker - D");
resolve();
}
});
});
};

const startCamera = (): Promise<any> => {
return new Promise(resolve => {
raspberryPiCamera.start(cameraOptions, resolve);
});
};

const runCamera = (): Promise<any> => {
// const timer = createTimer("frame");
raspberryPiCamera.on("frame", (buffer: Buffer) => {
// const timeTaken = timer();
// log.log("frame " + timeTaken);

framesSubject.next(buffer);
// runProcess();
});
return framesSubject.pipe(first(frame => !!frame)).toPromise();
};

let sendCount = 0;

const runProcess = () => {
const msg = {
type: "detect",
buffer: framesSubject.value,
count: sendCount++
};
// log.log("sending " + sendCount);
sentTimes[sendCount] = Date.now();
worker.postMessage(msg);
};
const startListening = () => {
console.log("detection - startListening - A");
const timer = createTimer("points");
workerMsgs
.pipe(filter(({ type }) => type === "points"))
.subscribe(({ points, count }) => {
console.log("detection - startListening - B");
log.log("receiving " + count);
pointsSubject.next(points);

Expand All @@ -90,15 +57,6 @@ const startListening = () => {
score: point.score
}));
updateFaces(displayPoints, delta);

runProcess();
});

workerMsgs
.pipe(filter(({ type }) => type === "receipt"))
.subscribe(({ count }) => {
if (sentTimes[count]) {
log.log("receipt", count, Date.now() - sentTimes[count]);
}
console.log("detection - startListening - C");
});
};
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Board } from "johnny-five";
import { RaspiIO } from "raspi-io";
import * as movement from "./movement";
import * as camera from "./camera";
import * as detection from "./detection";
import * as eyes from "./eyes";
import * as behaviour from "./behaviour";
import * as dashboard from "./dashboard";
Expand All @@ -24,7 +24,7 @@ board.on("ready", async () => {
await eyes.setup(board);
eyes.start();
movement.setup();
const faceSubject = await camera.setup();
const faceSubject = await detection.setup();
behaviour.setup(faceSubject);

board.on("exit", function() {
Expand Down
71 changes: 49 additions & 22 deletions src/worker-opencv.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,76 @@
import cv from "opencv";
import { parentPort, isMainThread, threadId } from "worker_threads";
import { BehaviorSubject } from "rxjs";

import { IFacePoint } from "./interfaces";
import { toFixed } from "./utils";
import { parentPort, isMainThread, threadId } from "worker_threads";
import { cameraOptions, savePhotoOnDetection } from "./config";
import { getFrames } from "./cameraHelper";

const dataPath = "./src/opencv/haarcascade_frontalface_alt.xml";

let framesSubject: BehaviorSubject<Buffer> = null;
let detectCount: number = 0;

interface ICVBox {
x: number;
y: number;
width: number;
height: number;
}

parentPort.on("message", msg => {
switch (msg.type) {
case "detect":
handleDetectMsg(msg);
break;
}
});
// parentPort.on("message", msg => {
// switch (msg.type) {
// case "detect":
// handleDetectMsg(msg);
// break;
// }
// });

const setup = async () => {
console.log("worker-opencv - setup - A");
framesSubject = await getFrames();
console.log("worker-opencv - setup - B");
await framesSubject.toPromise();
console.log("worker-opencv - setup - C");
parentPort.postMessage({ type: "init" });
console.log("worker-opencv - setup - D");
};

const handleDetectMsg = async (msg: {
type: string;
buffer: Uint8Array;
count: number;
}) => {
const { count } = msg;
parentPort.postMessage({
type: "receipt",
count: msg.count
});
const buffer = Buffer.from(msg.buffer);
const points = await detect(buffer, msg.count);
const startProcessing = async () => {
console.log("worker-opencv - startProcessing - A");
const points = await detect(framesSubject.value);
console.log("worker-opencv - startProcessing - B");
detectCount++;
console.log("worker-opencv - startProcessing - C");
parentPort.postMessage({
type: "points",
points,
count
count: detectCount
});
console.log("worker-opencv - startProcessing - D");
startProcessing();
};

// const handleDetectMsg = async (msg: {
// type: string;
// buffer: Uint8Array;
// count: number;
// }) => {
// const { count } = msg;
// parentPort.postMessage({
// type: "receipt",
// count: msg.count
// });
// const buffer = Buffer.from(msg.buffer);
// const points = await detect(buffer, msg.count);
// parentPort.postMessage({
// type: "points",
// points,
// count
// });
// };

const cvReadImage = imgBuffer =>
new Promise((resolve, reject) => {
cv.readImage(imgBuffer, (err, im) => (err ? reject(err) : resolve(im)));
Expand All @@ -56,7 +83,7 @@ const cvDetectFaces = (im): Promise<ICVBox[]> =>
);
});

const detect = async (imgBuffer, count: number): Promise<IFacePoint[]> => {
const detect = async (imgBuffer): Promise<IFacePoint[]> => {
const im = await cvReadImage(imgBuffer);
const boxes = await cvDetectFaces(im);
const faces = boxes.map(boxToPoint);
Expand Down
23 changes: 16 additions & 7 deletions testscripts/workers-child.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
const { workerData, parentPort, isMainThread } = require("worker_threads");
// import { getFrames } from "../src/cameraHelper";
import { Board } from "johnny-five";
import { RaspiIO } from "raspi-io";

console.log("start child");

Expand All @@ -11,13 +14,19 @@ parentPort.on("message", ({ type, num }) => {
});

const start = num => {
parentPort.postMessage({ type: "starting", num });
for (var i = 0; i < 1000000; i++) {
if (i % 100000 === 0) {
console.log("child", num, i);
}
}
parentPort.postMessage({ type: "finished", num });
// getFrames();

const board: any = new Board({
io: new (RaspiIO as any)()
});

// parentPort.postMessage({ type: "starting", num });
// for (var i = 0; i < 1000000; i++) {
// if (i % 100000 === 0) {
// console.log("child", num, i);
// }
// }
// parentPort.postMessage({ type: "finished", num });
};

// parentPort.postMessage({ start: workerData, isMainThread });
3 changes: 0 additions & 3 deletions testscripts/workers-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,3 @@ const start = num => {
};

setup(1);
setup(2);
setup(3);
setup(4);

0 comments on commit e181847

Please sign in to comment.