Skip to content

Commit

Permalink
fixed: multiplexing deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
uce authored and StephanEwen committed Jul 19, 2013
1 parent 0540148 commit 4542555
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1402,8 +1402,11 @@ private void calculateConnectionIDs() {
for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {

final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);

int currentConnectionID = 0;

if (groupVertex.isOutputVertex()) {
groupVertex.calculateConnectionID(0, alreadyVisited);
currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,19 +927,25 @@ public Iterator<ExecutionVertex> iterator() {
* the current connection ID
* @param alreadyVisited
* the set of already visited group vertices
* @return maximum assigned connectionID
*/
void calculateConnectionID(final int currentConnectionID, final Set<ExecutionGroupVertex> alreadyVisited) {
int calculateConnectionID(int currentConnectionID, final Set<ExecutionGroupVertex> alreadyVisited) {

if (!alreadyVisited.add(this)) {
return;
return currentConnectionID;
}

int nextConnectionID = currentConnectionID;

for (final ExecutionGroupEdge backwardLink : this.backwardLinks) {
backwardLink.setConnectionID(nextConnectionID);
backwardLink.getSourceVertex().calculateConnectionID(nextConnectionID, alreadyVisited);
++nextConnectionID;

backwardLink.setConnectionID(currentConnectionID);

++currentConnectionID;

currentConnectionID = backwardLink.getSourceVertex()
.calculateConnectionID(currentConnectionID, alreadyVisited);
}

return currentConnectionID;
}

/**
Expand Down

0 comments on commit 4542555

Please sign in to comment.