Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at streaming bodies from fetch() #1102

Merged
merged 2 commits into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion js/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export {
Writer,
Closer,
Seeker,
ReaderCloser,
ReadCloser,
WriteCloser,
ReadSeeker,
WriteSeeker,
Expand Down
6 changes: 3 additions & 3 deletions js/dom_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ interface AbortSignal extends EventTarget {
): void;
}

interface ReadableStream {
export interface ReadableStream {
readonly locked: boolean;
cancel(): Promise<void>;
getReader(): ReadableStreamReader;
Expand All @@ -235,7 +235,7 @@ interface EventListenerObject {
handleEvent(evt: Event): void;
}

interface ReadableStreamReader {
export interface ReadableStreamReader {
cancel(): Promise<void>;
// tslint:disable-next-line:no-any
read(): Promise<any>;
Expand Down Expand Up @@ -270,7 +270,7 @@ export interface Blob {
slice(start?: number, end?: number, contentType?: string): Blob;
}

interface Body {
export interface Body {
/** A simple getter used to expose a `ReadableStream` of the body contents. */
readonly body: ReadableStream | null;
/** Stores a `Boolean` that declares whether the body has been used in a
Expand Down
171 changes: 111 additions & 60 deletions js/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,144 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import {
assert,
log,
createResolvable,
Resolvable,
typedArrayToArrayBuffer,
notImplemented
} from "./util";
import { assert, log, createResolvable, notImplemented } from "./util";
import * as flatbuffers from "./flatbuffers";
import { sendAsync } from "./dispatch";
import * as msg from "gen/msg_generated";
import * as domTypes from "./dom_types";
import { TextDecoder } from "./text_encoding";
import { DenoBlob } from "./blob";
import { Headers } from "./headers";
import * as io from "./io";
import { read, close } from "./files";
import { Buffer } from "./buffer";

class Body implements domTypes.Body, domTypes.ReadableStream, io.ReadCloser {
bodyUsed = false;
private _bodyPromise: null | Promise<ArrayBuffer> = null;
private _data: ArrayBuffer | null = null;
readonly locked: boolean = false; // TODO
readonly body: null | Body = this;

constructor(private rid: number, readonly contentType: string) {}

private async _bodyBuffer(): Promise<ArrayBuffer> {
assert(this._bodyPromise == null);
const buf = new Buffer();
try {
const nread = await buf.readFrom(this);
const ui8 = buf.bytes();
assert(ui8.byteLength === nread);
this._data = ui8.buffer.slice(
ui8.byteOffset,
ui8.byteOffset + nread
) as ArrayBuffer;
assert(this._data.byteLength === nread);
} finally {
this.close();
}

return this._data;
}

async arrayBuffer(): Promise<ArrayBuffer> {
// If we've already bufferred the response, just return it.
if (this._data != null) {
return this._data;
}

// If there is no _bodyPromise yet, start it.
if (this._bodyPromise == null) {
this._bodyPromise = this._bodyBuffer();
}

class FetchResponse implements domTypes.Response {
return this._bodyPromise;
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.contentType
});
}

async formData(): Promise<domTypes.FormData> {
return notImplemented();
}

// tslint:disable-next-line:no-any
async json(): Promise<any> {
ry marked this conversation as resolved.
Show resolved Hide resolved
const text = await this.text();
return JSON.parse(text);
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
}

read(p: Uint8Array): Promise<io.ReadResult> {
return read(this.rid, p);
}

close(): void {
close(this.rid);
}

async cancel(): Promise<void> {
return notImplemented();
}

getReader(): domTypes.ReadableStreamReader {
return notImplemented();
}
}

class Response implements domTypes.Response {
readonly url: string = "";
body: null;
bodyUsed = false; // TODO
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
headers: domTypes.Headers;
readonly trailer: Promise<domTypes.Headers>;
//private bodyChunks: Uint8Array[] = [];
private first = true;
private bodyData: ArrayBuffer;
private bodyWaiter: Resolvable<ArrayBuffer>;
bodyUsed = false;
readonly body: Body;

constructor(
readonly status: number,
readonly body_: ArrayBuffer,
headersList: Array<[string, string]>
headersList: Array<[string, string]>,
rid: number,
body_: null | Body = null
) {
this.bodyWaiter = createResolvable();
this.trailer = createResolvable();
this.headers = new Headers(headersList);
this.bodyData = body_;
setTimeout(() => {
this.bodyWaiter.resolve(body_);
}, 0);
const contentType = this.headers.get("content-type") || "";

if (body_ == null) {
this.body = new Body(rid, contentType);
} else {
this.body = body_;
}
}

arrayBuffer(): Promise<ArrayBuffer> {
return this.bodyWaiter;
async arrayBuffer(): Promise<ArrayBuffer> {
return this.body.arrayBuffer();
}

async blob(): Promise<domTypes.Blob> {
const arrayBuffer = await this.arrayBuffer();
return new DenoBlob([arrayBuffer], {
type: this.headers.get("content-type") || ""
});
return this.body.blob();
}

async formData(): Promise<domTypes.FormData> {
notImplemented();
return {} as domTypes.FormData;
return this.body.formData();
}

async json(): Promise<object> {
const text = await this.text();
return JSON.parse(text);
// tslint:disable-next-line:no-any
async json(): Promise<any> {
ry marked this conversation as resolved.
Show resolved Hide resolved
return this.body.json();
}

async text(): Promise<string> {
const ab = await this.arrayBuffer();
const decoder = new TextDecoder("utf-8");
return decoder.decode(ab);
return this.body.text();
}

get ok(): boolean {
Expand All @@ -87,33 +158,15 @@ class FetchResponse implements domTypes.Response {
headersList.push(header);
}

return new FetchResponse(this.status, this.bodyData.slice(0), headersList);
}

onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;

onMsg(base: msg.Base) {
/*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
*/

if (this.first) {
this.first = false;
}
return new Response(this.status, headersList, -1, this.body);
}
}

/** Fetch a resource from the network. */
export async function fetch(
input?: domTypes.Request | string,
init?: domTypes.RequestInit
): Promise<domTypes.Response> {
): Promise<Response> {
const url = input as string;
log("dispatch FETCH_REQ", url);

Expand All @@ -134,9 +187,7 @@ export async function fetch(
assert(resBase.inner(inner) != null);

const status = inner.status();
const bodyArray = inner.bodyArray();
assert(bodyArray != null);
const body = typedArrayToArrayBuffer(bodyArray!);
const bodyRid = inner.bodyRid();

const headersList: Array<[string, string]> = [];
const len = inner.headerKeyLength();
Expand All @@ -146,6 +197,6 @@ export async function fetch(
headersList.push([key, value]);
}

const response = new FetchResponse(status, body, headersList);
const response = new Response(status, headersList, bodyRid);
return response;
}
2 changes: 1 addition & 1 deletion js/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface Seeker {
}

// https://golang.org/pkg/io/#ReadCloser
export interface ReaderCloser extends Reader, Closer {}
export interface ReadCloser extends Reader, Closer {}

// https://golang.org/pkg/io/#WriteCloser
export interface WriteCloser extends Writer, Closer {}
Expand Down
115 changes: 115 additions & 0 deletions src/http_body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.

use futures::Async;
use futures::Poll;
use hyper::body::Payload;
use hyper::Body;
use hyper::Chunk;
use std::cmp::min;
use std::io;
use std::io::Read;
use tokio::io::AsyncRead;

/// Wraps hyper::Body so that it can be exposed as an AsyncRead and integrated
/// into resources more easily.
pub struct HttpBody {
body: Body,
chunk: Option<Chunk>,
pos: usize,
}

impl HttpBody {
pub fn from(body: Body) -> HttpBody {
HttpBody {
body,
chunk: None,
pos: 0,
}
}
}

impl Read for HttpBody {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
unimplemented!();
}
}

impl AsyncRead for HttpBody {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
match self.chunk.take() {
Some(chunk) => {
debug!(
ry marked this conversation as resolved.
Show resolved Hide resolved
"HttpBody Fake Read buf {} chunk {} pos {}",
buf.len(),
chunk.len(),
self.pos
);
let n = min(buf.len(), chunk.len() - self.pos);
{
let rest = &chunk[self.pos..];
buf[..n].clone_from_slice(&rest[..n]);
}
self.pos += n;
if self.pos == chunk.len() {
self.pos = 0;
} else {
self.chunk = Some(chunk);
}
return Ok(Async::Ready(n));
}
None => {
assert_eq!(self.pos, 0);
}
}

let p = self.body.poll_data();
match p {
Err(e) => Err(
// TODO Need to map hyper::Error into std::io::Error.
io::Error::new(io::ErrorKind::Other, e),
),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(maybe_chunk)) => match maybe_chunk {
None => Ok(Async::Ready(0)),
Some(chunk) => {
debug!(
"HttpBody Real Read buf {} chunk {} pos {}",
buf.len(),
chunk.len(),
self.pos
);
let n = min(buf.len(), chunk.len());
buf[..n].clone_from_slice(&chunk[..n]);
if buf.len() < chunk.len() {
self.pos = n;
self.chunk = Some(chunk);
}
Ok(Async::Ready(n))
}
},
}
}
}

#[test]
fn test_body_async_read() {
use std::str::from_utf8;
let body = Body::from("hello world");
let mut body = HttpBody::from(body);

let buf = &mut [0, 0, 0, 0, 0];
let r = body.poll_read(buf);
assert!(r.is_ok());
assert_eq!(r.unwrap(), Async::Ready(5));
assert_eq!(from_utf8(buf).unwrap(), "hello");

let r = body.poll_read(buf);
assert!(r.is_ok());
assert_eq!(r.unwrap(), Async::Ready(5));
assert_eq!(from_utf8(buf).unwrap(), " worl");

let r = body.poll_read(buf);
assert!(r.is_ok());
assert_eq!(r.unwrap(), Async::Ready(1));
assert_eq!(from_utf8(&buf[0..1]).unwrap(), "d");
}
Loading