Skip to content

Commit

Permalink
Implement fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed May 27, 2018
1 parent ef00cf3 commit a831d1e
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 10 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
TS_FILES = \
dispatch.ts \
fetch.ts \
globals.ts \
main.ts \
msg.pb.d.ts \
Expand All @@ -18,8 +19,9 @@ GO_FILES = \
assets.go \
deno_dir.go \
deno_dir_test.go \
echo.go \
dispatch.go \
echo.go \
fetch.go \
main.go \
msg.pb.go \
os.go \
Expand Down
8 changes: 8 additions & 0 deletions dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func Sub(channel string, cb Subscriber) {
}

func Pub(channel string, payload []byte) {
wg.Add(1)
resChan <- &BaseMsg{
Channel: channel,
Payload: payload,
Expand All @@ -79,11 +80,14 @@ func PubMsg(channel string, msg *Msg) {
}

func DispatchLoop() {
// runtime.LockOSThread()
wg.Add(1)
first := true

// In a goroutine, we wait on for all goroutines to complete (for example
// timers). We use this to signal to the main thread to exit.
// wg.Add(1) basically translates to uv_ref, if this was Node.
// wg.Done() basically translates to uv_unref
go func() {
wg.Wait()
doneChan <- true
Expand All @@ -92,7 +96,11 @@ func DispatchLoop() {
for {
select {
case msg := <-resChan:
wg.Done()
out, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
err = worker.SendBytes(out)
stats.v8workerSend++
stats.v8workerBytesSent += len(out)
Expand Down
14 changes: 14 additions & 0 deletions dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { _global } from "./globals";
import { main as pb } from "./msg.pb";

type MessageCallback = (msg: Uint8Array) => void;
//type MessageStructCallback = (msg: pb.IMsg) => void;

const send = V8Worker2.send;
const channels = new Map<string, MessageCallback[]>();
Expand All @@ -16,6 +17,19 @@ export function sub(channel: string, cb: MessageCallback): void {
subscribers.push(cb);
}

/*
export function subMsg(channel: string, cb: MessageStructCallback): void {
sub(channel, (payload: Uint8Array) => {
const msg = pb.Msg.decode(payload);
if (msg.error != null) {
f.onError(new Error(msg.error));
} else {
cb(msg);
}
});
}
*/

export function pub(channel: string, payload: Uint8Array): null | ArrayBuffer {
const msg = pb.BaseMsg.fromObject({ channel, payload });
const ui8 = pb.BaseMsg.encode(msg).finish();
Expand Down
64 changes: 64 additions & 0 deletions fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"github.com/golang/protobuf/proto"
"io/ioutil"
"net/http"
)

func InitFetch() {
Sub("fetch", func(buf []byte) []byte {
msg := &Msg{}
check(proto.Unmarshal(buf, msg))
switch msg.Command {
case Msg_FETCH_REQ:
return Fetch(
msg.FetchReqId,
msg.FetchReqUrl)
default:
panic("[fetch] Unexpected message " + string(buf))
}
})
}

func Fetch(id int32, targetUrl string) []byte {
logDebug("Fetch %d %s", id, targetUrl)
async(func() {
resMsg := &Msg{
Command: Msg_FETCH_RES,
FetchResId: id,
}

resp, err := http.Get(targetUrl)
if err != nil {
resMsg.Error = err.Error()
PubMsg("fetch", resMsg)
return
}
if resp == nil {
resMsg.Error = "resp is nil "
PubMsg("fetch", resMsg)
return
}

resMsg.FetchResStatus = int32(resp.StatusCode)
logDebug("fetch success %d %s", resMsg.FetchResStatus, targetUrl)
PubMsg("fetch", resMsg)

// Now we read the body and send another message0

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if resp == nil {
resMsg.Error = "resp is nil "
PubMsg("fetch", resMsg)
return
}

resMsg.FetchResBody = body
PubMsg("fetch", resMsg)

// TODO streaming.
})
return nil
}
151 changes: 151 additions & 0 deletions fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { assert, log, createResolvable, Resolvable } from "./util";
import * as util from "./util";
import * as dispatch from "./dispatch";
import { main as pb } from "./msg.pb";
import { TextDecoder } from "text-encoding";

export function initFetch() {
dispatch.sub("fetch", (payload: Uint8Array) => {
const msg = pb.Msg.decode(payload);
assert(msg.command === pb.Msg.Command.FETCH_RES);
const id = msg.fetchResId;
const f = fetchRequests.get(id);
assert(f != null, `Couldn't find FetchRequest id ${id}`);

f.onMsg(msg);
});
}

const fetchRequests = new Map<number, FetchRequest>();

class FetchResponse implements Response {
readonly url: string;
body: null;
bodyUsed = false; // TODO
status: number;
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
headers: null; // TODO
//private bodyChunks: Uint8Array[] = [];
private first = true;

constructor(readonly req: FetchRequest) {
this.url = req.url;
}

bodyWaiter: Resolvable<ArrayBuffer>;
arrayBuffer(): Promise<ArrayBuffer> {
this.bodyWaiter = createResolvable();
return this.bodyWaiter;
}

blob(): Promise<Blob> {
throw Error("not implemented");
}

formData(): Promise<FormData> {
throw Error("not implemented");
}

async json(): Promise<object> {
const text = await this.text();
return JSON.parse(text);
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const enc = new TextDecoder("utf-8");
// Maybe new Uint8Array(ab)
return enc.decode(ab);
}

get ok(): boolean {
return 200 <= this.status && this.status < 300;
}

clone(): Response {
throw Error("not implemented");
}

onHeader: (res: Response) => void;
onError: (error: Error) => void;

onMsg(msg: pb.Msg) {
if (msg.error !== null && msg.error !== "") {
//throw new Error(msg.error)
this.onError(new Error(msg.error));
return;
}

if (this.first) {
this.first = false;
this.status = msg.fetchResStatus;
this.onHeader(this);
} else {
// Body message. Assuming it all comes in one message now.
const ab = util.typedArrayToArrayBuffer(msg.fetchResBody);
this.bodyWaiter.resolve(ab);
}
}
}

let nextFetchId = 0;
//TODO implements Request
class FetchRequest {
private readonly id: number;
response: FetchResponse;
constructor(readonly url: string) {
this.id = nextFetchId++;
fetchRequests.set(this.id, this);
this.response = new FetchResponse(this);
}

onMsg(msg: pb.Msg) {
this.response.onMsg(msg);
}

destroy() {
fetchRequests.delete(this.id);
}

start() {
log("dispatch FETCH_REQ", this.id, this.url);
const res = dispatch.sendMsg("fetch", {
command: pb.Msg.Command.FETCH_REQ,
fetchReqId: this.id,
fetchReqUrl: this.url
});
assert(res == null);
}
}

export function fetch(
input?: Request | string,
init?: RequestInit
): Promise<Response> {
const fetchReq = new FetchRequest(input as string);
const response = fetchReq.response;
return new Promise((resolve, reject) => {
// tslint:disable-next-line:no-any
response.onHeader = (response: any) => {
log("onHeader");
resolve(response);
};
response.onError = (error: Error) => {
log("onError", error);
reject(error);
};
fetchReq.start();
});
}

/*
fetch('http:https://example.com/movies.json')
.then(function(response) {
return response.json();
})
.then(function(myJson) {
console.log(myJson);
});
*/
7 changes: 7 additions & 0 deletions globals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ function stringifyArgs(args: any[]): string {
}
return out.join(" ");
}

import { fetch } from "./fetch";
_global["fetch"] = fetch;

import { TextEncoder, TextDecoder } from "text-encoding";
_global["TextEncoder"] = TextEncoder;
_global["TextDecoder"] = TextDecoder;
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func FlagsParse() []string {
if *flagV8Options {
args = append(args, "--help")
}
args = append(args, "--abort-on-uncaught-exception")
args = v8worker2.SetFlags(args)

return args
Expand All @@ -49,6 +50,7 @@ func main() {
InitOS()
InitEcho()
InitTimers()
InitFetch()

main_js := stringAsset("main.js")
err := worker.Load("/main.js", main_js)
Expand Down
3 changes: 2 additions & 1 deletion main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { main as pb } from "./msg.pb";
import * as runtime from "./runtime";
import * as util from "./util";

// These have top-level functions that need to execute.
import { initTimers } from "./timers";
import { initFetch } from "./fetch";

// To control internal logging output
// Set with the -debug command-line flag.
Expand All @@ -32,6 +32,7 @@ dispatch.sub("start", (payload: Uint8Array) => {
util.log("start", { cwd, argv, debugFlag });

initTimers();
initFetch();
runtime.setup(mainJs, mainMap);

const inputFn = argv[0];
Expand Down
20 changes: 17 additions & 3 deletions msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ message BaseMsg {
}

message Msg {
string error = 1;

enum Command {
ERROR = 0;
START = 1;
Expand All @@ -19,15 +17,20 @@ message Msg {
TIMER_START = 6;
TIMER_READY = 7;
TIMER_CLEAR = 8;
FETCH_REQ = 9;
FETCH_RES = 10;
}
Command command = 2;
Command command = 1;

// We avoid creating a message for each command (and use oneof or any types)
// In order to reduce code in the size of the generated javascript
// "msg.pb.js". It seems that each new message adds 20k and we want to
// potentially add many hundreds of commands. Therefore we just prefix command
// arguments by their name.

// ERROR
string error = 2;

// START
string start_cwd = 10;
repeated string start_argv = 11;
Expand Down Expand Up @@ -67,4 +70,15 @@ message Msg {

// TIMER_CLEAR
int32 timer_clear_id = 80;

// FETCH_REQ
int32 fetch_req_id = 90;
string fetch_req_url = 91;
// repeated string fetch_req_header_line = 91

// FETCH_RES
int32 fetch_res_id = 100;
int32 fetch_res_status = 101;
repeated string fetch_res_header_line = 102;
bytes fetch_res_body = 103;
}
Loading

0 comments on commit a831d1e

Please sign in to comment.