Skip to content

Commit

Permalink
[FLINK-12247][rest] Fix NPE when writing the archive json file to Fil…
Browse files Browse the repository at this point in the history
…eSystem

This closes apache#8250.
  • Loading branch information
lamberken authored and tillrohrmann committed Apr 24, 2019
1 parent 8a17483 commit 191b9df
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)

for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
ResponseBody json = createAccumulatorInfo(attempt);
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
archive.add(new ArchivedJson(path, json));
if (attempt != null){
ResponseBody json = createAccumulatorInfo(attempt);
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
archive.add(new ArchivedJson(path, json));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,15 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)

for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
archive.add(new ArchivedJson(path, json));
if (attempt != null) {
ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
archive.add(new ArchivedJson(path, json));
}
}
}
}
Expand Down

0 comments on commit 191b9df

Please sign in to comment.