Skip to content

Commit

Permalink
[FLINK-13639] Refactor the IntermediateResultPartitionID to consist o…
Browse files Browse the repository at this point in the history
…f IntermediateDataSetID and partitionIndex
  • Loading branch information
KarmaGYZ authored and zhuzhurk committed Apr 24, 2020
1 parent deb1268 commit c4b44e9
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVert
this.producer = producer;
this.partitionNumber = partitionNumber;
this.consumers = new ArrayList<List<ExecutionEdge>>(0);
this.partitionId = new IntermediateResultPartitionID();
this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
}

public ExecutionVertex getProducer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
ByteBuf result = null;

try {
result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16 + 4);
result = allocateBuffer(allocator, ID, 20 + 16 + 4 + 16 + 4);

partitionId.getPartitionId().writeTo(result);
partitionId.getProducerId().writeTo(result);
Expand Down Expand Up @@ -569,7 +569,7 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
// TODO Directly serialize to Netty's buffer
ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);

result = allocateBuffer(allocator, ID, 4 + serializedEvent.remaining() + 16 + 16 + 16);
result = allocateBuffer(allocator, ID, 4 + serializedEvent.remaining() + 20 + 16 + 16);

result.writeInt(serializedEvent.remaining());
result.writeBytes(serializedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.runtime.topology.ResultID;
import org.apache.flink.util.AbstractID;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import java.util.UUID;

/**
Expand Down Expand Up @@ -54,4 +56,19 @@ public IntermediateDataSetID(AbstractID from) {
public IntermediateDataSetID(UUID from) {
super(from.getLeastSignificantBits(), from.getMostSignificantBits());
}

private IntermediateDataSetID(long lower, long upper) {
super(lower, upper);
}

public void writeTo(ByteBuf buf) {
buf.writeLong(lowerPart);
buf.writeLong(upperPart);
}

public static IntermediateDataSetID fromByteBuf(ByteBuf buf) {
final long lower = buf.readLong();
final long upper = buf.readLong();
return new IntermediateDataSetID(lower, upper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,70 @@

package org.apache.flink.runtime.jobgraph;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.topology.ResultID;
import org.apache.flink.util.AbstractID;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

/**
* Id identifying {@link IntermediateResultPartition}.
*/
public class IntermediateResultPartitionID extends AbstractID implements ResultID {
public class IntermediateResultPartitionID implements ResultID {

private static final long serialVersionUID = 1L;

private final IntermediateDataSetID intermediateDataSetID;
private final int partitionNum;

/**
* Creates an new random intermediate result partition ID.
* Creates an new random intermediate result partition ID for testing.
*/
@VisibleForTesting
public IntermediateResultPartitionID() {
super();
this.partitionNum = -1;
this.intermediateDataSetID = new IntermediateDataSetID();
}

public IntermediateResultPartitionID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
/**
* Creates an new intermediate result partition ID with {@link IntermediateDataSetID} and the partitionNum.
*/
public IntermediateResultPartitionID(IntermediateDataSetID intermediateDataSetID, int partitionNum) {
this.intermediateDataSetID = intermediateDataSetID;
this.partitionNum = partitionNum;
}

public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart);
intermediateDataSetID.writeTo(buf);
buf.writeInt(partitionNum);
}

public static IntermediateResultPartitionID fromByteBuf(ByteBuf buf) {
long lower = buf.readLong();
long upper = buf.readLong();
return new IntermediateResultPartitionID(lower, upper);
final IntermediateDataSetID intermediateDataSetID = IntermediateDataSetID.fromByteBuf(buf);
final int partitionNum = buf.readInt();
return new IntermediateResultPartitionID(intermediateDataSetID, partitionNum);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == getClass()) {
IntermediateResultPartitionID that = (IntermediateResultPartitionID) obj;
return that.intermediateDataSetID.equals(this.intermediateDataSetID)
&& that.partitionNum == this.partitionNum;
} else {
return false;
}
}

@Override
public int hashCode() {
return this.intermediateDataSetID.hashCode() ^ this.partitionNum;
}

@Override
public String toString() {
return intermediateDataSetID.toString() + "#" + partitionNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
/**
* ID of a {@link Result}.
*/
public interface ResultID {
public interface ResultID extends java.io.Serializable {
}

0 comments on commit c4b44e9

Please sign in to comment.