-
Notifications
You must be signed in to change notification settings - Fork 523
/
client-stream.js
133 lines (108 loc) · 2.85 KB
/
client-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
'use strict'
const { finished } = require('stream')
const {
InvalidArgumentError,
InvalidReturnValueError
} = require('./errors')
const util = require('./util')
const { AsyncResource } = require('async_hooks')
class StreamHandler extends AsyncResource {
constructor (opts, factory, callback) {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}
if (typeof factory !== 'function') {
throw new InvalidArgumentError('invalid factory')
}
if (opts.method === 'CONNECT') {
throw new InvalidArgumentError('invalid method')
}
super('UNDICI_STREAM')
this.opaque = opts.opaque || null
this.factory = factory
this.callback = callback
this.res = null
this.resume = null
this.trailers = null
}
onConnect (resume) {
this.resume = resume
}
onHeaders (statusCode, headers) {
const { factory, opaque } = this
if (statusCode < 200) {
return
}
this.factory = null
const res = this.runInAsyncScope(factory, null, {
statusCode,
headers: util.parseHeaders(headers),
opaque
})
if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}
res.on('drain', this.resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, resume } = this
this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}
if (err) {
this.resume = null
resume(err)
}
this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
})
this.res = res
}
onData (chunk) {
const { res } = this
return res.write(chunk)
}
onComplete (trailers) {
const { res } = this
this.trailers = trailers ? util.parseHeaders(trailers) : {}
res.end()
}
onError (err) {
const { res, callback, opaque } = this
this.factory = null
if (res) {
this.res = null
util.destroy(res, err)
} else if (callback) {
this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
}
}
}
function stream (client, opts, factory, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
stream(client, opts, factory, (err, data) => {
return err ? reject(err) : resolve(data)
})
})
}
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}
try {
client.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
process.nextTick(callback, err, null)
}
}
module.exports = {
stream,
StreamHandler
}