Skip to content

Commit

Permalink
[hotfix] Make failover region topological sorted
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 2, 2017
1 parent 3b0fb26 commit 3ff91be
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTo
if (predecessorRegion != thisRegion) {

// we need to merge our region and the predecessor's region
thisRegion.addAll(predecessorRegion);
distinctRegions.remove(predecessorRegion);
predecessorRegion.addAll(thisRegion);
distinctRegions.remove(thisRegion);
thisRegion = predecessorRegion;

// remap the vertices from that merged region
for (ExecutionVertex inPredRegion: predecessorRegion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy r
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);

v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);

Expand Down

0 comments on commit 3ff91be

Please sign in to comment.