Skip to content

Commit

Permalink
Merge pull request microsoft#194436 from jeanp413/fix-194284
Browse files Browse the repository at this point in the history
Fixes websocket doesn't handle ping frames correctly
  • Loading branch information
alexdima authored Sep 29, 2023
2 parents 187f0ff + 9f2d7b6 commit d79823e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 22 deletions.
56 changes: 34 additions & 22 deletions src/vs/base/parts/ipc/node/ipc.net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ interface ISocketTracer {
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
}

interface FrameOptions {
compressed: boolean;
opcode: number;
}

/**
* See https://tools.ietf.org/html/rfc6455#section-5.2
*/
Expand All @@ -219,7 +224,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
fin: 0,
compressed: false,
firstFrameOfMessage: true,
mask: 0
mask: 0,
opcode: 0
};

public get permessageDeflate(): boolean {
Expand Down Expand Up @@ -256,7 +262,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
inflateBytes,
recordInflateBytes,
this._onData,
(data, compressed) => this._write(data, compressed)
(data, options) => this._write(data, options)
));
this._register(this._flowManager.onError((err) => {
// zlib errors are fatal, since we have no idea how to recover
Expand Down Expand Up @@ -319,12 +325,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT

let start = 0;
while (start < buffer.byteLength) {
this._flowManager.writeMessage(buffer.slice(start, Math.min(start + Constants.MaxWebSocketMessageLength, buffer.byteLength)));
this._flowManager.writeMessage(buffer.slice(start, Math.min(start + Constants.MaxWebSocketMessageLength, buffer.byteLength)), { compressed: true, opcode: 0x02 /* Binary frame */ });
start += Constants.MaxWebSocketMessageLength;
}
}

private _write(buffer: VSBuffer, compressed: boolean): void {
private _write(buffer: VSBuffer, { compressed, opcode }: FrameOptions): void {
if (this._isEnded) {
// Avoid ERR_STREAM_WRITE_AFTER_END
return;
Expand All @@ -341,12 +347,10 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
}
const header = VSBuffer.alloc(headerLen);

if (compressed) {
// The RSV1 bit indicates a compressed frame
header.writeUInt8(0b11000010, 0);
} else {
header.writeUInt8(0b10000010, 0);
}
// The RSV1 bit indicates a compressed frame
const compressedFlag = compressed ? 0b01000000 : 0;
const opcodeFlag = opcode & 0b00001111;
header.writeUInt8(0b10000000 | compressedFlag | opcodeFlag, 0);
if (buffer.byteLength < 126) {
header.writeUInt8(buffer.byteLength, 1);
} else if (buffer.byteLength < 2 ** 16) {
Expand Down Expand Up @@ -390,6 +394,8 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
const firstByte = peekHeader.readUInt8(0);
const finBit = (firstByte & 0b10000000) >>> 7;
const rsv1Bit = (firstByte & 0b01000000) >>> 6;
const opcode = (firstByte & 0b00001111);

const secondByte = peekHeader.readUInt8(1);
const hasMask = (secondByte & 0b10000000) >>> 7;
const len = (secondByte & 0b01111111);
Expand All @@ -403,8 +409,9 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
}
this._state.firstFrameOfMessage = Boolean(finBit);
this._state.mask = 0;
this._state.opcode = opcode;

this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin });
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, opcode: this._state.opcode });

} else if (this._state.state === ReadState.ReadHeader) {
// read entire header
Expand Down Expand Up @@ -446,7 +453,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
this._state.readLen = len;
this._state.mask = mask;

this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask });
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask, opcode: this._state.opcode });

} else if (this._state.state === ReadState.ReadBody) {
// read body
Expand All @@ -461,7 +468,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketT
this._state.readLen = Constants.MinHeaderByteSize;
this._state.mask = 0;

this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
if (this._state.opcode <= 0x02 /* Continuation frame or Text frame or binary frame */) {
this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
} else if (this._state.opcode === 0x09 /* Ping frame */) {
// Ping frames could be send by some browsers e.g. Firefox
this._flowManager.writeMessage(body, { compressed: false, opcode: 0x0A /* Pong frame */ });
}
}
}
}
Expand All @@ -483,7 +495,7 @@ class WebSocketFlowManager extends Disposable {

private readonly _zlibInflateStream: ZlibInflateStream | null;
private readonly _zlibDeflateStream: ZlibDeflateStream | null;
private readonly _writeQueue: VSBuffer[] = [];
private readonly _writeQueue: { data: VSBuffer; options: FrameOptions }[] = [];
private readonly _readQueue: { data: VSBuffer; isCompressed: boolean; isLastFrameOfMessage: boolean }[] = [];

private readonly _onDidFinishProcessingReadQueue = this._register(new Emitter<void>());
Expand All @@ -509,7 +521,7 @@ class WebSocketFlowManager extends Disposable {
inflateBytes: VSBuffer | null,
recordInflateBytes: boolean,
private readonly _onData: Emitter<VSBuffer>,
private readonly _writeFn: (data: VSBuffer, compressed: boolean) => void
private readonly _writeFn: (data: VSBuffer, options: FrameOptions) => void
) {
super();
if (permessageDeflate) {
Expand All @@ -526,8 +538,8 @@ class WebSocketFlowManager extends Disposable {
}
}

public writeMessage(message: VSBuffer): void {
this._writeQueue.push(message);
public writeMessage(data: VSBuffer, options: FrameOptions): void {
this._writeQueue.push({ data, options });
this._processWriteQueue();
}

Expand All @@ -538,12 +550,12 @@ class WebSocketFlowManager extends Disposable {
}
this._isProcessingWriteQueue = true;
while (this._writeQueue.length > 0) {
const message = this._writeQueue.shift()!;
if (this._zlibDeflateStream) {
const data = await this._deflateMessage(this._zlibDeflateStream, message);
this._writeFn(data, true);
const { data, options } = this._writeQueue.shift()!;
if (this._zlibDeflateStream && options.compressed) {
const compressedData = await this._deflateMessage(this._zlibDeflateStream, data);
this._writeFn(compressedData, options);
} else {
this._writeFn(message, false);
this._writeFn(data, { ...options, compressed: false });
}
}
this._isProcessingWriteQueue = false;
Expand Down
40 changes: 40 additions & 0 deletions src/vs/base/parts/ipc/test/node/ipc.net.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,19 @@ suite('WebSocketNodeSocket', () => {
private readonly _onClose = new Emitter<SocketCloseEvent>();
public readonly onClose = this._onClose.event;

public writtenData: VSBuffer[] = [];

public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
}

constructor() {
super();
}

public write(data: VSBuffer): void {
this.writtenData.push(data);
}

public fireData(data: number[]): void {
this._onData.fire(VSBuffer.wrap(toUint8Array(data)));
}
Expand Down Expand Up @@ -746,6 +752,40 @@ suite('WebSocketNodeSocket', () => {
assert.strictEqual(receivingSideOnDataCallCount, 4);
});

test('issue #194284: ping/pong opcodes are supported', async () => {

const disposables = new DisposableStore();
const socket = new FakeNodeSocket();
const webSocket = disposables.add(new WebSocketNodeSocket(<any>socket, false, null, false));

let receivedData: string = '';
disposables.add(webSocket.onData((buff) => {
receivedData += fromCharCodeArray(fromUint8Array(buff.buffer));
}));

// A single-frame non-compressed text message that contains "Hello"
socket.fireData([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);

// A ping message that contains "data"
socket.fireData([0x89, 0x04, 0x64, 0x61, 0x74, 0x61]);

// Another single-frame non-compressed text message that contains "Hello"
socket.fireData([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);

assert.strictEqual(receivedData, 'HelloHello');
assert.deepStrictEqual(
socket.writtenData.map(x => fromUint8Array(x.buffer)),
[
// A pong message that contains "data"
[0x8A, 0x04, 0x64, 0x61, 0x74, 0x61]
]
);

disposables.dispose();

return receivedData;
});

function generateRandomBuffer(size: number): VSBuffer {
const buff = VSBuffer.alloc(size);
for (let i = 0; i < size; i++) {
Expand Down

0 comments on commit d79823e

Please sign in to comment.