Skip to content

Commit

Permalink
[FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of Jo…
Browse files Browse the repository at this point in the history
…bGraph

This closes apache#7403
  • Loading branch information
sunhaibotb authored and sunjincheng121 committed Jan 19, 2019
1 parent ba3ffe3 commit a7eb845
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class StreamEdge implements Serializable {

private final String edgeId;

private final StreamNode sourceVertex;
private final StreamNode targetVertex;
private final int sourceId;
private final int targetId;

/**
* The type number of the input for co-tasks.
Expand All @@ -60,33 +60,37 @@ public class StreamEdge implements Serializable {
*/
private StreamPartitioner<?> outputPartitioner;

/**
* The name of the operator in the source vertex.
*/
private final String sourceOperatorName;

/**
* The name of the operator in the target vertex.
*/
private final String targetOperatorName;

public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.sourceId = sourceVertex.getId();
this.targetId = targetVertex.getId();
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
this.sourceOperatorName = sourceVertex.getOperatorName();
this.targetOperatorName = targetVertex.getOperatorName();

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+ "_" + outputPartitioner;
}

public StreamNode getSourceVertex() {
return sourceVertex;
}

public StreamNode getTargetVertex() {
return targetVertex;
}

public int getSourceId() {
return sourceVertex.getId();
return sourceId;
}

public int getTargetId() {
return targetVertex.getId();
return targetId;
}

public int getTypeNumber() {
Expand Down Expand Up @@ -130,8 +134,8 @@ public boolean equals(Object o) {

@Override
public String toString() {
return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ ", outputTag=" + outputTag + ')';
return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
+ ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ ", outputTag=" + outputTag + ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,17 @@ public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
return iterationSourceSinkPairs;
}

public StreamNode getSourceVertex(StreamEdge edge) {
return streamNodes.get(edge.getSourceId());
}

public StreamNode getTargetVertex(StreamEdge edge) {
return streamNodes.get(edge.getTargetId());
}

private void removeEdge(StreamEdge edge) {
edge.getSourceVertex().getOutEdges().remove(edge);
edge.getTargetVertex().getInEdges().remove(edge);
getSourceVertex(edge).getOutEdges().remove(edge);
getTargetVertex(edge).getInEdges().remove(edge);
}

private void removeVertex(StreamNode toRemove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph str
// Generate the hash code. Because multiple path exist to each
// node, we might not have all required inputs available to
// generate the hash code.
if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled(), streamGraph)) {
// Add the child nodes
for (StreamEdge outEdge : currentNode.getOutEdges()) {
StreamNode child = outEdge.getTargetVertex();
StreamNode child = streamGraph.getTargetVertex(outEdge);

if (!visited.contains(child.getId())) {
remaining.add(child);
Expand Down Expand Up @@ -145,7 +145,8 @@ private boolean generateNodeHash(
StreamNode node,
HashFunction hashFunction,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled) {
boolean isChainingEnabled,
StreamGraph streamGraph) {

// Check for user-specified ID
String userSpecifiedHash = node.getTransformationUID();
Expand All @@ -162,7 +163,7 @@ private boolean generateNodeHash(
}

Hasher hasher = hashFunction.newHasher();
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);

if (hashes.put(node.getId(), hash) != null) {
// Sanity check
Expand Down Expand Up @@ -211,7 +212,8 @@ private byte[] generateDeterministicHash(
StreamNode node,
Hasher hasher,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled) {
boolean isChainingEnabled,
StreamGraph streamGraph) {

// Include stream node to hash. We use the current size of the computed
// hashes as the ID. We cannot use the node's ID, because it is
Expand All @@ -221,7 +223,7 @@ private byte[] generateDeterministicHash(

// Include chained nodes to hash
for (StreamEdge outEdge : node.getOutEdges()) {
if (isChainable(outEdge, isChainingEnabled)) {
if (isChainable(outEdge, isChainingEnabled, streamGraph)) {

// Use the hash size again, because the nodes are chained to
// this node. This does not add a hash for the chained nodes.
Expand All @@ -239,7 +241,7 @@ private byte[] generateDeterministicHash(
// Sanity check
if (otherHash == null) {
throw new IllegalStateException("Missing hash for input node "
+ inEdge.getSourceVertex() + ". Cannot generate hash for "
+ streamGraph.getSourceVertex(inEdge) + ". Cannot generate hash for "
+ node + ".");
}

Expand Down Expand Up @@ -279,9 +281,9 @@ private void generateNodeLocalHash(Hasher hasher, int id) {
hasher.putInt(id);
}

private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ private void connect(Integer headOfChain, StreamEdge edge) {
}

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testVirtualTransformations() throws Exception {

// verify that only last partitioning takes precedence
assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
assertEquals(rebalanceMap.getId(), graph.getSourceVertex(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0)).getId());

// verify that partitioning in unions is preserved and that it works across split/select
assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public void testImmutabilityWithCoiteration() {
assertEquals(2, graph.getIterationSourceSinkPairs().size());

for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
assertEquals(graph.getTargetVertex(sourceSinkPair.f0.getOutEdges().get(0)),
graph.getSourceVertex(sourceSinkPair.f1.getInEdges().get(0)));
}
}

Expand Down Expand Up @@ -244,9 +245,9 @@ public void testmultipleHeadsTailsSimple() {
assertEquals(itSource.getParallelism(), itSink.getParallelism());

for (StreamEdge edge : itSource.getOutEdges()) {
if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
if (graph.getTargetVertex(edge).getOperatorName().equals("IterRebalanceMap")) {
assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
} else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
} else if (graph.getTargetVertex(edge).getOperatorName().equals("IterForwardMap")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
}
}
Expand Down Expand Up @@ -331,16 +332,16 @@ public void testmultipleHeadsTailsWithTailPartitioning() {
assertEquals(itSource.getParallelism(), itSink.getParallelism());

for (StreamEdge edge : itSource.getOutEdges()) {
if (edge.getTargetVertex().getOperatorName().equals("map1")) {
if (graph.getTargetVertex(edge).getOperatorName().equals("map1")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
assertEquals(4, edge.getTargetVertex().getParallelism());
} else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
assertEquals(4, graph.getTargetVertex(edge).getParallelism());
} else if (graph.getTargetVertex(edge).getOperatorName().equals("shuffle")) {
assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
assertEquals(2, edge.getTargetVertex().getParallelism());
assertEquals(2, graph.getTargetVertex(edge).getParallelism());
}
}
for (StreamEdge edge : itSink.getInEdges()) {
String tailName = edge.getSourceVertex().getOperatorName();
String tailName = graph.getSourceVertex(edge).getOperatorName();
if (tailName.equals("split")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
assertTrue(edge.getSelectedNames().contains("even"));
Expand Down

0 comments on commit a7eb845

Please sign in to comment.