Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanp413 authored and filiptronicek committed Oct 4, 2023
1 parent 7fff3f6 commit 0d3157b
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions src/vs/base/parts/ipc/node/ipc.net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ interface ISocketTracer {
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
}

interface FrameOptions {
compressed: boolean;
opcode: number;
}

/**
* See https://tools.ietf.org/html/rfc6455#section-5.2
*/
Expand All @@ -219,7 +224,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
fin: 0,
compressed: false,
firstFrameOfMessage: true,
mask: 0
mask: 0,
opcode: 0
};

public get permessageDeflate(): boolean {
Expand Down Expand Up @@ -256,7 +262,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
inflateBytes,
recordInflateBytes,
this._onData,
(data, compressed) => this._write(data, compressed)
(data, options) => this._write(data, options)
));
this._register(this._flowManager.onError((err) => {
// zlib errors are fatal, since we have no idea how to recover
Expand Down Expand Up @@ -319,12 +325,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT

let start = 0;
while (start < buffer.byteLength) {
this._flowManager.writeMessage(buffer.slice(start, Math.min(start + Constants.MaxWebSocketMessageLength, buffer.byteLength)));
this._flowManager.writeMessage(buffer.slice(start, Math.min(start + Constants.MaxWebSocketMessageLength, buffer.byteLength)), { compressed: true, opcode: 0x02 /* Binary frame */ });
start += Constants.MaxWebSocketMessageLength;
}
}

private _write(buffer: VSBuffer, compressed: boolean): void {
private _write(buffer: VSBuffer, { compressed, opcode }: FrameOptions): void {
if (this._isEnded) {
// Avoid ERR_STREAM_WRITE_AFTER_END
return;
Expand All @@ -341,12 +347,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
}
const header = VSBuffer.alloc(headerLen);

if (compressed) {
// The RSV1 bit indicates a compressed frame
header.writeUInt8(0b11000010, 0);
} else {
header.writeUInt8(0b10000010, 0);
}
// The RSV1 bit indicates a compressed frame
const compressedFlag = compressed ? 0b01000000 : 0;
const opcodeFlag = opcode & 0b00001111;
header.writeUInt8(0b10000000 | compressedFlag | opcodeFlag, 0);
if (buffer.byteLength < 126) {
header.writeUInt8(buffer.byteLength, 1);
} else if (buffer.byteLength < 2 ** 16) {
Expand Down Expand Up @@ -390,6 +394,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
const firstByte = peekHeader.readUInt8(0);
const finBit = (firstByte & 0b10000000) >>> 7;
const rsv1Bit = (firstByte & 0b01000000) >>> 6;
const opcode = (firstByte & 0b00001111);

const secondByte = peekHeader.readUInt8(1);
const hasMask = (secondByte & 0b10000000) >>> 7;
const len = (secondByte & 0b01111111);
Expand All @@ -403,8 +409,9 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
}
this._state.firstFrameOfMessage = Boolean(finBit);
this._state.mask = 0;
this._state.opcode = opcode;

this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin });
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, opcode: this._state.opcode });

} else if (this._state.state === ReadState.ReadHeader) {
// read entire header
Expand Down Expand Up @@ -446,7 +453,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
this._state.readLen = len;
this._state.mask = mask;

this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask });
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask, opcode: this._state.opcode });

} else if (this._state.state === ReadState.ReadBody) {
// read body
Expand All @@ -461,7 +468,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
this._state.readLen = Constants.MinHeaderByteSize;
this._state.mask = 0;

this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
if (this._state.opcode <= 0x02 /* Continuation frame or Text frame or binary frame */) {
this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
} else if (this._state.opcode === 0x09 /* Ping frame */) {
// Ping frames could be send by some browsers e.g. Firefox
this._flowManager.writeMessage(body, { compressed: false, opcode: 0x0A /* Pong frame */ });
}
}
}
}
Expand All @@ -483,7 +495,7 @@ class WebSocketFlowManager extends Disposable {

private readonly _zlibInflateStream: ZlibInflateStream | null;
private readonly _zlibDeflateStream: ZlibDeflateStream | null;
private readonly _writeQueue: VSBuffer[] = [];
private readonly _writeQueue: { data: VSBuffer; options: FrameOptions }[] = [];
private readonly _readQueue: { data: VSBuffer; isCompressed: boolean; isLastFrameOfMessage: boolean }[] = [];

private readonly _onDidFinishProcessingReadQueue = this._register(new Emitter<void>());
Expand All @@ -509,7 +521,7 @@ class WebSocketFlowManager extends Disposable {
inflateBytes: VSBuffer | null,
recordInflateBytes: boolean,
private readonly _onData: Emitter<VSBuffer>,
private readonly _writeFn: (data: VSBuffer, compressed: boolean) => void
private readonly _writeFn: (data: VSBuffer, options: FrameOptions) => void
) {
super();
if (permessageDeflate) {
Expand All @@ -526,8 +538,8 @@ class WebSocketFlowManager extends Disposable {
}
}

public writeMessage(message: VSBuffer): void {
this._writeQueue.push(message);
public writeMessage(data: VSBuffer, options: FrameOptions): void {
this._writeQueue.push({ data, options });
this._processWriteQueue();
}

Expand All @@ -538,12 +550,12 @@ class WebSocketFlowManager extends Disposable {
}
this._isProcessingWriteQueue = true;
while (this._writeQueue.length > 0) {
const message = this._writeQueue.shift()!;
if (this._zlibDeflateStream) {
const data = await this._deflateMessage(this._zlibDeflateStream, message);
this._writeFn(data, true);
const { data, options } = this._writeQueue.shift()!;
if (this._zlibDeflateStream && options.compressed) {
const compressedData = await this._deflateMessage(this._zlibDeflateStream, data);
this._writeFn(compressedData, options);
} else {
this._writeFn(message, false);
this._writeFn(data, { ...options, compressed: false });
}
}
this._isProcessingWriteQueue = false;
Expand Down

0 comments on commit 0d3157b

Please sign in to comment.