Skip to content

Commit

Permalink
[FLINK-19323][network] Small optimization of RecordWriter#serializeRe…
Browse files Browse the repository at this point in the history
…cord

Currently, when serializing a record, the serializer will first skip 4 bytes for length filed and serialize the record. Then it gets the serialized record length and skips back to position 0 to write the length field. After that, it skip again to the tail of the serialized data. This patch avoids the last two skips by writing length field to position 0 directly.
  • Loading branch information
wsry authored and AHeise committed Sep 24, 2020
1 parent 8ec4f1d commit aa62e64
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ public void writeInt(int v) throws IOException {
this.position += 4;
}

public void writeIntUnsafe(int v, int pos) throws IOException {
if (LITTLE_ENDIAN) {
v = Integer.reverseBytes(v);
}
UNSAFE.putInt(this.buffer, BASE_OFFSET + pos, v);
}

@SuppressWarnings("restriction")
@Override
public void writeLong(long v) throws IOException {
Expand Down Expand Up @@ -345,6 +352,10 @@ public void setPosition(int position) {
this.position = position;
}

public void setPositionUnsafe(int position) {
this.position = position;
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,14 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws
public static ByteBuffer serializeRecord(
DataOutputSerializer serializer,
IOReadableWritable record) throws IOException {
serializer.clear();

// the initial capacity should be no less than 4 bytes
serializer.skipBytesToWrite(4);
serializer.setPositionUnsafe(4);

// write data
record.write(serializer);

// write length
int len = serializer.length() - 4;
serializer.setPosition(0);
serializer.writeInt(len);
serializer.skipBytesToWrite(len);
serializer.writeIntUnsafe(serializer.length() - 4, 0);

return serializer.wrapAsByteBuffer();
}
Expand Down

0 comments on commit aa62e64

Please sign in to comment.