Skip to content

Commit

Permalink
[FLINK-17331][runtime] Explicitly get the ByteBuf length of all class…
Browse files Browse the repository at this point in the history
…es which is written to NettyMessage

Currently, the length of some header fields in NettyMessage is hardcoded: InputChannelID, ExecutionAttemptID, e.t.c.
So if we make some changes for such field, then we are not ware that it also needs to change the respective length for related netty messages component.
This PR explicitly get the ByteBuf length of all classes which is written to NettyMessage to avoid such problems.
  • Loading branch information
KarmaGYZ authored and pnowojski committed Dec 29, 2020
1 parent a7551f6 commit 96d8c0d
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
public class ExecutionAttemptID implements java.io.Serializable {

private static final long serialVersionUID = -1169683445778281344L;
// Represent the number of bytes occupied when writes ExecutionAttemptID to the ByteBuf.
// It is the sum of two long types(lowerPart and upperPart of the executionAttemptId).
private static final int BYTEBUF_LEN = 16;

private final AbstractID executionAttemptId;

Expand All @@ -55,6 +58,10 @@ public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
return new ExecutionAttemptID(new AbstractID(buf.readLong(), buf.readLong()));
}

public static int getByteBufLength() {
return BYTEBUF_LEN;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,13 @@ static class BufferResponse extends NettyMessage {

// receiver ID (16), sequence number (4), backlog (4), dataType (1), isCompressed (1),
// buffer size (4)
static final int MESSAGE_HEADER_LENGTH = 16 + 4 + 4 + 1 + 1 + 4;
static final int MESSAGE_HEADER_LENGTH =
InputChannelID.getByteBufLength()
+ Integer.BYTES
+ Integer.BYTES
+ Byte.BYTES
+ Byte.BYTES
+ Integer.BYTES;

final Buffer buffer;

Expand Down Expand Up @@ -518,7 +524,17 @@ void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator
bb.writeInt(credit);
};

writeToChannel(out, promise, allocator, consumer, ID, 20 + 16 + 4 + 16 + 4);
writeToChannel(
out,
promise,
allocator,
consumer,
ID,
IntermediateResultPartitionID.getByteBufLength()
+ ExecutionAttemptID.getByteBufLength()
+ Integer.BYTES
+ InputChannelID.getByteBufLength()
+ Integer.BYTES);
}

static PartitionRequest readFrom(ByteBuf buffer) {
Expand Down Expand Up @@ -578,7 +594,11 @@ void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator
allocator,
consumer,
ID,
4 + serializedEvent.remaining() + 20 + 16 + 16);
Integer.BYTES
+ serializedEvent.remaining()
+ IntermediateResultPartitionID.getByteBufLength()
+ ExecutionAttemptID.getByteBufLength()
+ InputChannelID.getByteBufLength());
}

static TaskEventRequest readFrom(ByteBuf buffer, ClassLoader classLoader)
Expand Down Expand Up @@ -623,7 +643,13 @@ static class CancelPartitionRequest extends NettyMessage {
@Override
void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator)
throws IOException {
writeToChannel(out, promise, allocator, receiverId::writeTo, ID, 16);
writeToChannel(
out,
promise,
allocator,
receiverId::writeTo,
ID,
InputChannelID.getByteBufLength());
}

static CancelPartitionRequest readFrom(ByteBuf buffer) throws Exception {
Expand Down Expand Up @@ -669,7 +695,9 @@ void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator
ByteBuf result = null;

try {
result = allocateBuffer(allocator, ID, 4 + 16);
result =
allocateBuffer(
allocator, ID, Integer.BYTES + InputChannelID.getByteBufLength());
result.writeInt(credit);
receiverId.writeTo(result);

Expand Down Expand Up @@ -706,7 +734,13 @@ static class ResumeConsumption extends NettyMessage {
@Override
void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator)
throws IOException {
writeToChannel(out, promise, allocator, receiverId::writeTo, ID, 16);
writeToChannel(
out,
promise,
allocator,
receiverId::writeTo,
ID,
InputChannelID.getByteBufLength());
}

static ResumeConsumption readFrom(ByteBuf buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
public class InputChannelID extends AbstractID {

private static final long serialVersionUID = 1L;
// Represent the number of bytes occupied when writes InputChannelID to the ByteBuf.
// It is the sum of two long types(lowerPart and upperPart).
private static final int BYTEBUF_LEN = 16;

public InputChannelID() {
super();
Expand All @@ -49,4 +52,8 @@ public static InputChannelID fromByteBuf(ByteBuf buf) {
long upper = buf.readLong();
return new InputChannelID(lower, upper);
}

public static int getByteBufLength() {
return BYTEBUF_LEN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
public class IntermediateResultPartitionID implements ResultID {

private static final long serialVersionUID = 1L;
// Represent the number of bytes occupied when writes IntermediateResultPartitionID to the
// ByteBuf.
// It is the sum of two long types(lowerPart and upperPart of the intermediateDataSetID) and one
// int type(partitionNum).
private static final int BYTEBUF_LEN = 20;

private final IntermediateDataSetID intermediateDataSetID;
private final int partitionNum;
Expand Down Expand Up @@ -60,6 +65,10 @@ public static IntermediateResultPartitionID fromByteBuf(ByteBuf buf) {
return new IntermediateResultPartitionID(intermediateDataSetID, partitionNum);
}

public static int getByteBufLength() {
return BYTEBUF_LEN;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.io.network.netty.NettyBufferPool;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import org.junit.Test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/** Tests for {@link ExecutionAttemptID}. */
public class ExecutionAttemptIDTest {
private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1);

@Test
public void testByteBufWriteAndRead() {
final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
final int byteBufLen = ExecutionAttemptID.getByteBufLength();
final ByteBuf byteBuf = ALLOCATOR.directBuffer(byteBufLen, byteBufLen);
executionAttemptID.writeTo(byteBuf);

assertThat(byteBuf.writerIndex(), is(equalTo(ExecutionAttemptID.getByteBufLength())));
assertThat(ExecutionAttemptID.fromByteBuf(byteBuf), is(equalTo(executionAttemptID)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.io.network.netty.NettyBufferPool;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import org.junit.Test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/** Tests for {@link InputChannelID}. */
public class InputChannelIDTest {
private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1);

@Test
public void testByteBufWriteAndRead() {
final InputChannelID inputChannelID = new InputChannelID();
final int byteBufLen = InputChannelID.getByteBufLength();
final ByteBuf byteBuf = ALLOCATOR.directBuffer(byteBufLen, byteBufLen);
inputChannelID.writeTo(byteBuf);

assertThat(byteBuf.writerIndex(), is(equalTo(InputChannelID.getByteBufLength())));
assertThat(InputChannelID.fromByteBuf(byteBuf), is(equalTo(inputChannelID)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobgraph;

import org.apache.flink.runtime.io.network.netty.NettyBufferPool;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import org.junit.Test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/** Tests for {@link IntermediateResultPartitionID}. */
public class IntermediateResultPartitionIDTest {
private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1);

@Test
public void testByteBufWriteAndRead() {
final IntermediateResultPartitionID intermediateResultPartitionID =
new IntermediateResultPartitionID();
final int byteBufLen = IntermediateResultPartitionID.getByteBufLength();
final ByteBuf byteBuf = ALLOCATOR.directBuffer(byteBufLen, byteBufLen);
intermediateResultPartitionID.writeTo(byteBuf);

assertThat(
byteBuf.writerIndex(),
is(equalTo(IntermediateResultPartitionID.getByteBufLength())));
assertThat(
IntermediateResultPartitionID.fromByteBuf(byteBuf),
is(equalTo(intermediateResultPartitionID)));
}
}

0 comments on commit 96d8c0d

Please sign in to comment.