Skip to content

Commit

Permalink
This closes apache#2295
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff committed Mar 23, 2017
2 parents 47ba7b0 + 081664e commit def96a2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
Expand Down Expand Up @@ -338,29 +342,56 @@ State waitUntilFinish(
return null; // Timed out.
}

private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();

@Override
public State cancel() throws IOException {
Job content = new Job();
content.setProjectId(getProjectId());
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
dataflowClient.updateJob(jobId, content);
return State.CANCELLED;
} catch (IOException e) {
State state = getState();
if (state.isTerminal()) {
LOG.warn("Job is already terminated. State is {}", state);
return state;
} else {
String errorMsg = String.format(
"Failed to cancel job in state %s, "
+ "please go to the Developers Console to cancel it manually: %s",
state,
MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
LOG.warn(errorMsg);
throw new IOException(errorMsg, e);
// Enforce that a cancel() call on the job is done at most once - as
// a workaround for Dataflow service's current bugs with multiple
// cancellation, where it may sometimes return an error when cancelling
// a job that was already cancelled, but still report the job state as
// RUNNING.
// To partially work around these issues, we absorb duplicate cancel()
// calls. This, of course, doesn't address the case when the job terminates
// externally almost concurrently to calling cancel(), but at least it
// makes it possible to safely call cancel() multiple times and from
// multiple threads in one program.
FutureTask<State> tentativeCancelTask = new FutureTask<>(new Callable<State>() {
@Override
public State call() throws Exception {
Job content = new Job();
content.setProjectId(getProjectId());
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
Job job = dataflowClient.updateJob(jobId, content);
return MonitoringUtil.toState(job.getCurrentState());
} catch (IOException e) {
State state = getState();
if (state.isTerminal()) {
LOG.warn("Job is already terminated. State is {}", state);
return state;
} else {
String errorMsg = String.format(
"Failed to cancel job in state %s, "
+ "please go to the Developers Console to cancel it manually: %s",
state,
MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
LOG.warn(errorMsg);
throw new IOException(errorMsg, e);
}
}
}
});
if (cancelState.compareAndSet(null, tentativeCancelTask)) {
// This thread should perform cancellation, while others will
// only wait for the result.
cancelState.get().run();
}
try {
return cancelState.get().get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ public void testCancelUnterminatedJobThatSucceeds() throws IOException {
mock(Dataflow.Projects.Locations.Jobs.Update.class);
when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
.thenReturn(update);
when(update.execute()).thenReturn(new Job());
when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED"));

DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);

Expand Down

0 comments on commit def96a2

Please sign in to comment.