Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at streaming bodies from fetch() #1102

Merged
merged 2 commits into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Support streaming response bodies from fetch()
Also Buffer.readFrom in fetch() to buffer response.
  • Loading branch information
ry committed Nov 14, 2018
commit f4048796b2db96d40173a3aecf790265dddac404
2 changes: 1 addition & 1 deletion js/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export {
Writer,
Closer,
Seeker,
ReaderCloser,
ReadCloser,
WriteCloser,
ReadSeeker,
WriteSeeker,
Expand Down
6 changes: 3 additions & 3 deletions js/dom_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ interface AbortSignal extends EventTarget {
): void;
}

interface ReadableStream {
export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
Expand All @@ -235,7 +235,7 @@ interface EventListenerObject {
handleEvent(evt: Event): void;
}

interface ReadableStreamReader {
export interface ReadableStreamReader {
cancel(): Promise<void>;
// tslint:disable-next-line:no-any
read(): Promise<any>;
Expand Down Expand Up @@ -270,7 +270,7 @@ export interface Blob {
slice(start?: number, end?: number, contentType?: string): Blob;
}

interface Body {
export interface Body {
/** A simple getter used to expose a `ReadableStream` of the body contents. */
readonly body: ReadableStream | null;
/** Stores a `Boolean` that declares whether the body has been used in a
Expand Down
171 changes: 111 additions & 60 deletions js/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,144 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import {
assert,
log,
createResolvable,
Resolvable,
typedArrayToArrayBuffer,
notImplemented
} from "./util";
import { assert, log, createResolvable, notImplemented } from "./util";
import * as flatbuffers from "./flatbuffers";
import { sendAsync } from "./dispatch";
import * as msg from "gen/msg_generated";
import * as domTypes from "./dom_types";
import { TextDecoder } from "./text_encoding";
import { DenoBlob } from "./blob";
import { Headers } from "./headers";
import * as io from "./io";
import { read, close } from "./files";
import { Buffer } from "./buffer";

class Body implements domTypes.Body, domTypes.ReadableStream, io.ReadCloser {
bodyUsed = false;
private _bodyPromise: null | Promise<ArrayBuffer> = null;
private _data: ArrayBuffer | null = null;
readonly locked: boolean = false; // TODO
readonly body: null | Body = this;

constructor(private rid: number, readonly contentType: string) {}

private async _bodyBuffer(): Promise<ArrayBuffer> {
assert(this._bodyPromise == null);
const buf = new Buffer();
try {
const nread = await buf.readFrom(this);
const ui8 = buf.bytes();
assert(ui8.byteLength === nread);
this._data = ui8.buffer.slice(
ui8.byteOffset,
ui8.byteOffset + nread
) as ArrayBuffer;
assert(this._data.byteLength === nread);
} finally {
this.close();
}

return this._data;
}

async arrayBuffer(): Promise<ArrayBuffer> {
// If we've already bufferred the response, just return it.
if (this._data != null) {
return this._data;
}

// If there is no _bodyPromise yet, start it.
if (this._bodyPromise == null) {
this._bodyPromise = this._bodyBuffer();
}

class FetchResponse implements domTypes.Response {
return this._bodyPromise;
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.contentType
});
}

async formData(): Promise<domTypes.FormData> {
return notImplemented();
}

// tslint:disable-next-line:no-any
async json(): Promise<any> {
ry marked this conversation as resolved.
Show resolved Hide resolved
const text = await this.text();
return JSON.parse(text);
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
}

read(p: Uint8Array): Promise<io.ReadResult> {
return read(this.rid, p);
}

close(): void {
close(this.rid);
}

async cancel(): Promise<void> {
return notImplemented();
}

getReader(): domTypes.ReadableStreamReader {
return notImplemented();
}
}

class Response implements domTypes.Response {
readonly url: string = "";
body: null;
bodyUsed = false; // TODO
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
headers: domTypes.Headers;
readonly trailer: Promise<domTypes.Headers>;
//private bodyChunks: Uint8Array[] = [];
private first = true;
private bodyData: ArrayBuffer;
private bodyWaiter: Resolvable<ArrayBuffer>;
bodyUsed = false;
readonly body: Body;

constructor(
readonly status: number,
readonly body_: ArrayBuffer,
headersList: Array<[string, string]>
headersList: Array<[string, string]>,
rid: number,
body_: null | Body = null
) {
this.bodyWaiter = createResolvable();
this.trailer = createResolvable();
this.headers = new Headers(headersList);
this.bodyData = body_;
setTimeout(() => {
this.bodyWaiter.resolve(body_);
}, 0);
const contentType = this.headers.get("content-type") || "";

if (body_ == null) {
this.body = new Body(rid, contentType);
} else {
this.body = body_;
}
}

arrayBuffer(): Promise<ArrayBuffer> {
return this.bodyWaiter;
async arrayBuffer(): Promise<ArrayBuffer> {
return this.body.arrayBuffer();
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.headers.get("content-type") || ""
});
return this.body.blob();
}

async formData(): Promise<domTypes.FormData> {
notImplemented();
return {} as domTypes.FormData;
return this.body.formData();
}

async json(): Promise<object> {
const text = await this.text();
return JSON.parse(text);
// tslint:disable-next-line:no-any
async json(): Promise<any> {
ry marked this conversation as resolved.
Show resolved Hide resolved
return this.body.json();
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
return this.body.text();
}

get ok(): boolean {
Expand All @@ -87,33 +158,15 @@ class FetchResponse implements domTypes.Response {
headersList.push(header);
}

return new FetchResponse(this.status, this.bodyData.slice(0), headersList);
}

onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;

onMsg(base: msg.Base) {
/*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
*/

if (this.first) {
this.first = false;
}
return new Response(this.status, headersList, -1, this.body);
}
}

/** Fetch a resource from the network. */
export async function fetch(
input?: domTypes.Request | string,
init?: domTypes.RequestInit
): Promise<domTypes.Response> {
): Promise<Response> {
const url = input as string;
log("dispatch FETCH_REQ", url);

Expand All @@ -134,9 +187,7 @@ export async function fetch(
assert(resBase.inner(inner) != null);

const status = inner.status();
const bodyArray = inner.bodyArray();
assert(bodyArray != null);
const body = typedArrayToArrayBuffer(bodyArray!);
const bodyRid = inner.bodyRid();

const headersList: Array<[string, string]> = [];
const len = inner.headerKeyLength();
Expand All @@ -146,6 +197,6 @@ export async function fetch(
headersList.push([key, value]);
}

const response = new FetchResponse(status, body, headersList);
const response = new Response(status, headersList, bodyRid);
return response;
}
2 changes: 1 addition & 1 deletion js/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface Seeker {
}

// https://golang.org/pkg/io/#ReadCloser
export interface ReaderCloser extends Reader, Closer {}
export interface ReadCloser extends Reader, Closer {}

// https://golang.org/pkg/io/#WriteCloser
export interface WriteCloser extends Writer, Closer {}
Expand Down
2 changes: 1 addition & 1 deletion src/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ table FetchRes {
status: int;
header_key: [string];
header_value: [string];
body: [ubyte];
body_rid: uint32;
}

table MakeTempDir {
Expand Down
15 changes: 6 additions & 9 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures;
use futures::future::poll_fn;
use futures::Poll;
use hyper;
use hyper::rt::{Future, Stream};
use hyper::rt::Future;
use remove_dir_all::remove_dir_all;
use repl;
use resources::table_entries;
Expand Down Expand Up @@ -417,20 +417,17 @@ fn op_fetch(
(keys, values)
};

// TODO Handle streaming body.
res
.into_body()
.concat2()
.map(move |body| (status, body, headers))
let body = res.into_body();
let body_resource = resources::add_hyper_body(body);
Ok((status, headers, body_resource))
});

let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
move |(status, body, headers)| {
move |(status, headers, body_resource)| {
debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate
// what status code.
let body_off = builder.create_vector(body.as_ref());
let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect();
let header_keys_off =
builder.create_vector_of_strings(header_keys.as_slice());
Expand All @@ -443,7 +440,7 @@ fn op_fetch(
&msg::FetchResArgs {
id,
status,
body: Some(body_off),
body_rid: body_resource.rid,
header_key: Some(header_keys_off),
header_value: Some(header_values_off),
},
Expand Down
15 changes: 15 additions & 0 deletions src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use eager_unix as eager;
use errors::bad_resource;
use errors::DenoError;
use errors::DenoResult;
use http_body::HttpBody;
use repl::Repl;
use tokio_util;
use tokio_write;

use futures;
use futures::future::{Either, FutureResult};
use futures::Poll;
use hyper;
use std;
use std::collections::HashMap;
use std::io::{Error, Read, Write};
Expand Down Expand Up @@ -59,6 +61,7 @@ enum Repr {
FsFile(tokio::fs::File),
TcpListener(tokio::net::TcpListener),
TcpStream(tokio::net::TcpStream),
HttpBody(HttpBody),
Repl(Repl),
}

Expand Down Expand Up @@ -89,6 +92,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::FsFile(_) => "fsFile",
Repr::TcpListener(_) => "tcpListener",
Repr::TcpStream(_) => "tcpStream",
Repr::HttpBody(_) => "httpBody",
Repr::Repl(_) => "repl",
};

Expand Down Expand Up @@ -155,6 +159,7 @@ impl AsyncRead for Resource {
Repr::FsFile(ref mut f) => f.poll_read(buf),
Repr::Stdin(ref mut f) => f.poll_read(buf),
Repr::TcpStream(ref mut f) => f.poll_read(buf),
Repr::HttpBody(ref mut f) => f.poll_read(buf),
_ => panic!("Cannot read"),
},
}
Expand Down Expand Up @@ -222,6 +227,15 @@ pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource {
Resource { rid }
}

pub fn add_hyper_body(body: hyper::Body) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
let body = HttpBody::from(body);
let r = tg.insert(rid, Repr::HttpBody(body));
ry marked this conversation as resolved.
Show resolved Hide resolved
assert!(r.is_none());
Resource { rid }
}

pub fn add_repl(repl: Repl) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
Expand All @@ -243,6 +257,7 @@ pub fn readline(rid: ResourceId, prompt: &str) -> DenoResult<String> {
}

pub fn lookup(rid: ResourceId) -> Option<Resource> {
debug!("resource lookup {}", rid);
let table = RESOURCE_TABLE.lock().unwrap();
table.get(&rid).map(|_| Resource { rid })
}
Expand Down