Skip to content

Commit

Permalink
Add dispatch pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed May 22, 2018
1 parent 9a66216 commit 08307fb
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 124 deletions.
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
TS_FILES = \
tsconfig.json \
dispatch.ts \
main.ts \
msg.pb.d.ts \
msg.pb.js \
Expand All @@ -10,7 +11,17 @@ TS_FILES = \
util.ts \
v8_source_maps.ts

deno: assets.go msg.pb.go main.go
GO_FILES = \
assets.go \
deno_dir.go \
dispatch.go \
handlers.go \
main.go \
main_test.go \
msg.pb.go \
util.go

deno: $(GO_FILES)
go build -o deno

assets.go: dist/main.js
Expand Down
90 changes: 90 additions & 0 deletions dispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"github.com/golang/protobuf/proto"
"github.com/ry/v8worker2"
"sync"
)

// There is a single global worker for this process.
// This file should be the only part of deno that directly access it, so that
// all interaction with V8 can go through a single point.
var worker *v8worker2.Worker

var channels = make(map[string][]Subscriber)

type Subscriber func(payload []byte) []byte

func createWorker() {
worker = v8worker2.New(recv)
}

func recv(buf []byte) (response []byte) {
msg := &BaseMsg{}
check(proto.Unmarshal(buf, msg))
assert(len(msg.Payload) > 0, "BaseMsg has empty payload.")
subscribers, ok := channels[msg.Channel]
if !ok {
panic("No subscribers for channel " + msg.Channel)
}
for i := 0; i < len(subscribers); i++ {
s := subscribers[i]
r := s(msg.Payload)
if r != nil {
response = r
}
}
return response
}

func Sub(channel string, cb Subscriber) {
subscribers, ok := channels[channel]
if !ok {
subscribers = make([]Subscriber, 0)
}
subscribers = append(subscribers, cb)
channels[channel] = subscribers
}

func Pub(channel string, payload []byte) {
resChan <- &BaseMsg{
Channel: channel,
Payload: payload,
}
}

var resChan = make(chan *BaseMsg, 10)
var doneChan = make(chan bool)
var wg sync.WaitGroup

func DispatchLoop() {
wg.Add(1)
first := true

// In a goroutine, we wait on for all goroutines to complete (for example
// timers). We use this to signal to the main thread to exit.
go func() {
wg.Wait()
doneChan <- true
}()

for {
select {
case msg := <-resChan:
out, err := proto.Marshal(msg)
err = worker.SendBytes(out)
check(err)
case <-doneChan:
// All goroutines have completed. Now we can exit main().
return
}

// We don't want to exit until we've received at least one message.
// This is so the program doesn't exit after sending the "start"
// message.
if first {
wg.Done()
}
first = false
}
}
61 changes: 61 additions & 0 deletions dispatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { typedArrayToArrayBuffer } from "./util";
import { _global } from "./globals";
import { main as pb } from "./msg.pb";

type MessageCallback = (msg: Uint8Array) => void;

const send = V8Worker2.send;
const channels = new Map<string, MessageCallback[]>();

export function sub(channel: string, cb: MessageCallback): void {
let subscribers = channels.get(channel);
if (!subscribers) {
subscribers = [];
channels.set(channel, subscribers);
}
subscribers.push(cb);
}

export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer {
const msg = pb.BaseMsg.fromObject({ channel, payload });
const ui8 = pb.BaseMsg.encode(msg).finish();
const ab = typedArrayToArrayBuffer(ui8);
return send(ab);
}

// Internal version of "pub".
// TODO add internal version of "sub"
// TODO rename to pubInternal()
export function sendMsgFromObject(
channel: string,
obj: pb.IMsg
): null | pb.Msg {
const msg = pb.Msg.fromObject(obj);
const ui8 = pb.Msg.encode(msg).finish();
const resBuf = pub(channel, ui8);
if (resBuf != null && resBuf.byteLength > 0) {
const res = pb.Msg.decode(new Uint8Array(resBuf));
if (res != null && res.error != null && res.error.length > 0) {
throw Error(res.error);
}
return res;
} else {
return null;
}
}

V8Worker2.recv((ab: ArrayBuffer) => {
const msg = pb.BaseMsg.decode(new Uint8Array(ab));
const subscribers = channels.get(msg.channel);
if (subscribers == null) {
throw Error(`No subscribers for channel "${msg.channel}".`);
}

for (const subscriber of subscribers) {
subscriber(msg.payload);
}
});

// Delete the V8Worker2 from the global object, so that no one else can receive
// messages.
_global["V8Worker2"] = null;
59 changes: 35 additions & 24 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,38 @@ import (

const assetPrefix string = "/$asset$/"

func recv(buf []byte) []byte {
msg := &Msg{}
err := proto.Unmarshal(buf, msg)
check(err)
switch msg.Payload.(type) {
case *Msg_Exit:
payload := msg.GetExit()
os.Exit(int(payload.Code))
case *Msg_SourceCodeFetch:
payload := msg.GetSourceCodeFetch()
return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile)
case *Msg_SourceCodeCache:
payload := msg.GetSourceCodeCache()
return HandleSourceCodeCache(payload.Filename, payload.SourceCode,
payload.OutputCode)
case *Msg_TimerStart:
payload := msg.GetTimerStart()
return HandleTimerStart(payload.Id, payload.Interval, payload.Duration)
default:
panic("Unexpected message")
}
func InitHandlers() {
Sub("os", func(buf []byte) []byte {
msg := &Msg{}
check(proto.Unmarshal(buf, msg))
switch msg.Payload.(type) {
case *Msg_Exit:
payload := msg.GetExit()
os.Exit(int(payload.Code))
case *Msg_SourceCodeFetch:
payload := msg.GetSourceCodeFetch()
return HandleSourceCodeFetch(payload.ModuleSpecifier, payload.ContainingFile)
case *Msg_SourceCodeCache:
payload := msg.GetSourceCodeCache()
return HandleSourceCodeCache(payload.Filename, payload.SourceCode,
payload.OutputCode)
default:
panic("[os] Unexpected message " + string(buf))
}
return nil
})

return nil
Sub("timers", func(buf []byte) []byte {
msg := &Msg{}
check(proto.Unmarshal(buf, msg))
switch msg.Payload.(type) {
case *Msg_TimerStart:
payload := msg.GetTimerStart()
return HandleTimerStart(payload.Id, payload.Interval, payload.Duration)
default:
panic("[timers] Unexpected message " + string(buf))
}
})
}

func HandleSourceCodeFetch(moduleSpecifier string, containingFile string) (out []byte) {
Expand Down Expand Up @@ -107,13 +116,15 @@ func HandleTimerStart(id int32, interval bool, duration int32) []byte {
go func() {
defer wg.Done()
time.Sleep(time.Duration(duration) * time.Millisecond)
resChan <- &Msg{
payload, err := proto.Marshal(&Msg{
Payload: &Msg_TimerReady{
TimerReady: &TimerReadyMsg{
Id: id,
},
},
}
})
check(err)
Pub("timers", payload)
}()
return nil
}
35 changes: 4 additions & 31 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/url"
"os"
"path"
"sync"
)

var flagReload = flag.Bool("reload", false, "Reload cached remote source code.")
Expand All @@ -19,9 +18,6 @@ var DenoDir string
var CompileDir string
var SrcDir string

var wg sync.WaitGroup
var resChan chan *Msg

func ResolveModule(moduleSpecifier string, containingFile string) (
moduleName string, filename string, err error) {
moduleUrl, err := url.Parse(moduleSpecifier)
Expand Down Expand Up @@ -58,7 +54,8 @@ func main() {
args = v8worker2.SetFlags(args)

createDirs()
worker := v8worker2.New(recv)
createWorker()
InitHandlers()

main_js := stringAsset("main.js")
check(worker.Load("/main.js", main_js))
Expand All @@ -67,9 +64,6 @@ func main() {
cwd, err := os.Getwd()
check(err)

resChan = make(chan *Msg)
doneChan := make(chan bool)

out, err := proto.Marshal(&Msg{
Payload: &Msg_Start{
Start: &StartMsg{
Expand All @@ -82,28 +76,7 @@ func main() {
},
})
check(err)
err = worker.SendBytes(out)
if err != nil {
os.Stderr.WriteString(err.Error())
os.Exit(1)
}

// In a goroutine, we wait on for all goroutines to complete (for example
// timers). We use this to signal to the main thread to exit.
go func() {
wg.Wait()
doneChan <- true
}()
Pub("start", out)

for {
select {
case msg := <-resChan:
out, err := proto.Marshal(msg)
err = worker.SendBytes(out)
check(err)
case <-doneChan:
// All goroutines have completed. Now we can exit main().
return
}
}
DispatchLoop()
}
46 changes: 16 additions & 30 deletions main.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,33 @@
import * as dispatch from "./dispatch";
import { main as pb } from "./msg.pb";
import "./util";

import * as runtime from "./runtime";
import * as timers from "./timers";
import * as util from "./util";

// These have top-level functions that need to execute.
import { initTimers } from "./timers";

// To control internal logging output
// Set with the -debug command-line flag.
export let debug = false;
let startCalled = false;

dispatch.sub("start", (payload: Uint8Array) => {
if (startCalled) {
throw Error("start message received more than once!");
}
startCalled = true;

const msg = pb.Msg.decode(payload);
const { cwd, argv, debugFlag, mainJs, mainMap } = msg.start;

function start(
cwd: string,
argv: string[],
debugFlag: boolean,
mainJs: string,
mainMap: string
): void {
debug = debugFlag;
util.log("start", { cwd, argv, debugFlag });

initTimers();
runtime.setup(mainJs, mainMap);

const inputFn = argv[0];
const mod = runtime.resolveModule(inputFn, cwd + "/");
mod.compileAndRun();
}

V8Worker2.recv((ab: ArrayBuffer) => {
const msg = pb.Msg.decode(new Uint8Array(ab));
switch (msg.payload) {
case "start":
start(
msg.start.cwd,
msg.start.argv,
msg.start.debugFlag,
msg.start.mainJs,
msg.start.mainMap
);
break;
case "timerReady":
timers.timerReady(msg.timerReady.id, msg.timerReady.done);
break;
default:
console.log("Unknown message", msg);
break;
}
});
Loading

0 comments on commit 08307fb

Please sign in to comment.