Skip to content

Commit

Permalink
Support streaming response bodies from fetch()
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Oct 26, 2018
1 parent 1f45407 commit 8e8e852
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 72 deletions.
6 changes: 3 additions & 3 deletions js/dom_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ interface AbortSignal extends EventTarget {
): void;
}

interface ReadableStream {
export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
Expand All @@ -235,7 +235,7 @@ interface EventListenerObject {
handleEvent(evt: Event): void;
}

interface ReadableStreamReader {
export interface ReadableStreamReader {
cancel(): Promise<void>;
// tslint:disable-next-line:no-any
read(): Promise<any>;
Expand Down Expand Up @@ -274,7 +274,7 @@ export interface Blob {
slice(start?: number, end?: number, contentType?: string): Blob;
}

interface Body {
export interface Body {
/** A simple getter used to expose a `ReadableStream` of the body contents. */
readonly body: ReadableStream | null;
/** Stores a `Boolean` that declares whether the body has been used in a
Expand Down
154 changes: 102 additions & 52 deletions js/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
log,
createResolvable,
Resolvable,
typedArrayToArrayBuffer,
concatUint8Array,
notImplemented
} from "./util";
import * as flatbuffers from "./flatbuffers";
Expand All @@ -14,6 +14,8 @@ import * as domTypes from "./dom_types";
import { TextDecoder } from "./text_encoding";
import { DenoBlob } from "./blob";
import { DomIterableMixin } from "./mixins/dom_iterable";
import { read } from "./files";
import * as io from "./io";

// tslint:disable-next-line:no-any
function isHeaders(value: any): value is domTypes.Headers {
Expand Down Expand Up @@ -97,59 +99,127 @@ export const Headers = DomIterableMixin<string, string, typeof HeadersBase>(
headerMap
);

class FetchResponse implements domTypes.Response {
class Body implements domTypes.Body, domTypes.ReadableStream, io.Reader {
bodyUsed = false;
readonly body: domTypes.ReadableStream | null = null; // TODO
private _buffering = false;
private _bodyWaiter: Resolvable<ArrayBuffer>;
private _chunks: Uint8Array[] = [];
private _data: Uint8Array | null = null;
readonly locked: boolean = false; // TODO

constructor(readonly rid: number, readonly contentType: string) {
this._bodyWaiter = createResolvable();
}

private async _bufferBody(): Promise<void> {
if (!this._buffering) {
this._buffering = true;
try {
while (true) {
const b = new Uint8Array(64 * 1024);
const r = await read(this.rid, b);
if (r.nread > 0) {
this._chunks.push(b.subarray(0, r.nread));
}
if (r.eof) {
break;
}
}
this._data = concatUint8Array(this._chunks);
this._chunks = [];
this._bodyWaiter.resolve(this._data!.buffer as ArrayBuffer);
} catch (e) {
this._bodyWaiter.reject(e);
} finally {
this._buffering = false;
}
}
}

async arrayBuffer(): Promise<ArrayBuffer> {
this._bufferBody();
return this._bodyWaiter;
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.contentType
});
}

async formData(): Promise<domTypes.FormData> {
notImplemented();
return {} as domTypes.FormData;
}

// tslint:disable-next-line:no-any
async json(): Promise<any> {
const text = await this.text();
return JSON.parse(text);
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
}

read(p: ArrayBufferView): Promise<io.ReadResult> {
return read(this.rid, p);
}

async cancel(): Promise<void> {
notImplemented();
}

getReader(): domTypes.ReadableStreamReader {
notImplemented();
return {} as domTypes.ReadableStreamReader;
}
}

class Response implements domTypes.Response {
readonly url: string = "";
body: null;
body: Body;
bodyUsed = false; // TODO
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
headers: domTypes.Headers;
readonly trailer: Promise<domTypes.Headers>;
//private bodyChunks: Uint8Array[] = [];
private first = true;
private bodyData: ArrayBuffer;
private bodyWaiter: Resolvable<ArrayBuffer>;

constructor(
readonly status: number,
readonly body_: ArrayBuffer,
headersList: Array<[string, string]>
headersList: Array<[string, string]>,
private bodyRid: number
) {
this.bodyWaiter = createResolvable();
this.trailer = createResolvable();
this.headers = new Headers(headersList);
this.bodyData = body_;
setTimeout(() => {
this.bodyWaiter.resolve(body_);
}, 0);
const contentType = this.headers.get("content-type") || "";
this.body = new Body(bodyRid, contentType);
}

arrayBuffer(): Promise<ArrayBuffer> {
return this.bodyWaiter;
async arrayBuffer(): Promise<ArrayBuffer> {
return this.body.arrayBuffer();
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.headers.get("content-type") || ""
});
return this.body.blob();
}

async formData(): Promise<domTypes.FormData> {
notImplemented();
return {} as domTypes.FormData;
return this.body.formData();
}

async json(): Promise<object> {
const text = await this.text();
return JSON.parse(text);
// tslint:disable-next-line:no-any
async json(): Promise<any> {
return this.body.json();
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
return this.body.text();
}

get ok(): boolean {
Expand All @@ -169,33 +239,15 @@ class FetchResponse implements domTypes.Response {
headersList.push(header);
}

return new FetchResponse(this.status, this.bodyData.slice(0), headersList);
}

onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;

onMsg(base: msg.Base) {
/*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
*/

if (this.first) {
this.first = false;
}
return new Response(this.status, headersList, this.bodyRid);
}
}

/** Fetch a resource from the network. */
export async function fetch(
input?: domTypes.Request | string,
init?: domTypes.RequestInit
): Promise<domTypes.Response> {
): Promise<Response> {
const url = input as string;
log("dispatch FETCH_REQ", url);

Expand All @@ -216,9 +268,7 @@ export async function fetch(
assert(resBase.inner(inner) != null);

const status = inner.status();
const bodyArray = inner.bodyArray();
assert(bodyArray != null);
const body = typedArrayToArrayBuffer(bodyArray!);
const bodyRid = inner.bodyRid();

const headersList: Array<[string, string]> = [];
const len = inner.headerKeyLength();
Expand All @@ -228,6 +278,6 @@ export async function fetch(
headersList.push([key, value]);
}

const response = new FetchResponse(status, body, headersList);
const response = new Response(status, headersList, bodyRid);
return response;
}
6 changes: 6 additions & 0 deletions js/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,9 @@ test(function headerSymbolIteratorSuccess() {
assertEqual(value, headers.get(key));
}
});

testPerm({ net: true }, async function fetchStreamBody() {
const res = await fetch("http:https://localhost:4545/tests/hello.txt");
const bytesWritten = await deno.copy(deno.stdout, res.body);
assertEqual(bytesWritten, 6);
});
7 changes: 2 additions & 5 deletions js/files_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ test(function filesStdioFileDescriptors() {
});

test(async function filesCopyToStdout() {
const filename = "package.json";
const file = await deno.open(filename);
const file = await deno.open("tests/hello.txt");
assert(file.rid > 2);
const bytesWritten = await deno.copy(deno.stdout, file);
const fileSize = deno.statSync(filename).len;
assertEqual(bytesWritten, fileSize);
console.log("bytes written", bytesWritten);
assertEqual(bytesWritten, 6);
});
4 changes: 2 additions & 2 deletions src/http_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use tokio::io::AsyncRead;

/// Wraps hyper::Body so that it can be exposed as an AsyncRead and integrated
/// into resources more easily.
struct HttpBody {
pub struct HttpBody {
body: Body,
chunk: Option<Chunk>,
pos: usize,
}

impl HttpBody {
fn from(body: Body) -> HttpBody {
pub fn from(body: Body) -> HttpBody {
HttpBody {
body,
chunk: None,
Expand Down
2 changes: 1 addition & 1 deletion src/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ table FetchRes {
status: int;
header_key: [string];
header_value: [string];
body: [ubyte];
body_rid: int;
}

table MakeTempDir {
Expand Down
15 changes: 6 additions & 9 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures;
use futures::future::poll_fn;
use futures::Poll;
use hyper;
use hyper::rt::{Future, Stream};
use hyper::rt::Future;
use remove_dir_all::remove_dir_all;
use std;
use std::fs;
Expand Down Expand Up @@ -423,20 +423,17 @@ fn op_fetch_req(
(keys, values)
};

// TODO Handle streaming body.
res
.into_body()
.concat2()
.map(move |body| (status, body, headers))
let body = res.into_body();
let body_resource = resources::add_hyper_body(body);
Ok((status, headers, body_resource))
});

let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
move |(status, body, headers)| {
move |(status, headers, body_resource)| {
debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate
// what status code.
let body_off = builder.create_vector(body.as_ref());
let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect();
let header_keys_off =
builder.create_vector_of_strings(header_keys.as_slice());
Expand All @@ -449,7 +446,7 @@ fn op_fetch_req(
&msg::FetchResArgs {
id,
status,
body: Some(body_off),
body_rid: body_resource.rid,
header_key: Some(header_keys_off),
header_value: Some(header_values_off),
..Default::default()
Expand Down
Loading

0 comments on commit 8e8e852

Please sign in to comment.