Skip to content

Commit

Permalink
[FLINK-11889] Remove "stop" signal and related interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Apr 18, 2019
1 parent 9bca0d6 commit e95b347
Show file tree
Hide file tree
Showing 50 changed files with 70 additions and 1,095 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
Expand Down Expand Up @@ -389,11 +389,11 @@ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGra

@Override
public void cancel(JobID jobID) throws Exception {
JobTerminationMessageParameters params = new JobTerminationMessageParameters();
JobCancellationMessageParameters params = new JobCancellationMessageParameters();
params.jobPathParameter.resolve(jobID);
params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
CompletableFuture<EmptyResponseBody> responseFuture = sendRequest(
JobTerminationHeaders.getInstance(),
JobCancellationHeaders.getInstance(),
params);
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
Expand Down Expand Up @@ -214,9 +213,9 @@ private RestClient createRestClient() throws ConfigurationException {
}

@Test
public void testJobSubmitCancelStop() throws Exception {
public void testJobSubmitCancel() throws Exception {
TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler();
TestJobExecutionResultHandler testJobExecutionResultHandler =
new TestJobExecutionResultHandler(
JobExecutionResultResponseBody.created(new JobResult.Builder()
Expand Down Expand Up @@ -286,24 +285,16 @@ protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull Handle
}
}

private class TestJobTerminationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
private class TestJobCancellationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobCancellationMessageParameters> {
private volatile boolean jobCanceled = false;
private volatile boolean jobStopped = false;

private TestJobTerminationHandler() {
super(JobTerminationHeaders.getInstance());
private TestJobCancellationHandler() {
super(JobCancellationHeaders.getInstance());
}

@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobTerminationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
switch (request.getQueryParameter(TerminationModeQueryParameter.class).get(0)) {
case CANCEL:
jobCanceled = true;
break;
case STOP:
jobStopped = true;
break;
}
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobCancellationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
jobCanceled = true;
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.streaming.connectors.nifi;

import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

Expand All @@ -40,7 +39,7 @@
* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
* produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
*/
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -147,9 +146,4 @@ public void close() throws Exception {
super.close();
client.close();
}

@Override
public void stop() {
this.isRunning = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.streaming.connectors.twitter;

import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
Expand Down Expand Up @@ -46,7 +45,7 @@
* Twitter. This is not a parallel source because the Twitter API only allows
* two concurrent connections.
*/
public class TwitterSource extends RichSourceFunction<String> implements StoppableFunction {
public class TwitterSource extends RichSourceFunction<String> {

private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);

Expand Down Expand Up @@ -183,12 +182,6 @@ public void cancel() {
close();
}

@Override
public void stop() {
LOG.info("Stopping Twitter source");
close();
}

// ------ Custom endpoints

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -252,7 +251,7 @@ public void getConfiguration() {
}

@Test
public void testStop() throws Exception {
public void testCancel() throws Exception {
// this only works if there is no active job at this point
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());

Expand Down Expand Up @@ -280,8 +279,8 @@ public void testStop() throws Exception {
final Deadline deadline = testTimeout.fromNow();

try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
// stop the job
client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
// cancel the job
client.sendPatchRequest("/jobs/" + jid + "/", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());

assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
Expand Down Expand Up @@ -311,7 +310,7 @@ public void testStop() throws Exception {
}

@Test
public void testStopYarn() throws Exception {
public void testCancelYarn() throws Exception {
// this only works if there is no active job at this point
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());

Expand Down Expand Up @@ -340,7 +339,7 @@ public void testStopYarn() throws Exception {

try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
client.sendGetRequest("/jobs/" + jid + "/yarn-cancel", deadline.timeLeft());

HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
Expand All @@ -367,9 +366,9 @@ private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Except
}

/**
* Test invokable that is stoppable and allows waiting for all subtasks to be running.
* Test invokable that allows waiting for all subtasks to be running.
*/
public static class BlockingInvokable extends AbstractInvokable implements StoppableTask {
public static class BlockingInvokable extends AbstractInvokable {

private static CountDownLatch latch = new CountDownLatch(2);

Expand All @@ -388,7 +387,7 @@ public void invoke() throws Exception {
}

@Override
public void stop() {
public void cancel() {
this.isRunning = false;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,6 @@ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
}

@Override
public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.stop(timeout));
}

@Override
public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,29 +667,6 @@ public void deploy() throws JobException {
}
}

/**
* Sends stop RPC call.
*/
public void stop() {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
() -> taskManagerGateway.stopTask(attemptId, rpcTimeout),
NUM_STOP_CALL_TRIES,
vertex.getExecutionGraph().getJobMasterMainThreadExecutor());

stopResultFuture.exceptionally(
failure -> {
LOG.info("Stopping task was not successful.", failure);
return null;
});
}
}

public void cancel() {
// depending on the previous state, we go directly to cancelled (no cancel call necessary)
// -- or to canceling (cancel call needs to be sent to the task manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
Expand Down Expand Up @@ -1093,21 +1092,6 @@ else if (current == JobStatus.RESTARTING) {
}
}

public void stop() throws StoppingException {

assertRunningInJobMasterMainThread();

if (isStoppable) {
for (ExecutionVertex ev : this.getAllExecutionVertices()) {
if (ev.getNumberOfInputs() == 0) { // send signal to sources only
ev.stop();
}
}
} else {
throw new StoppingException("This job is not stoppable.");
}
}

/**
* Suspends the current ExecutionGraph.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,6 @@ public CompletableFuture<?> suspend() {
return currentExecution.suspend();
}

public void stop() {
currentExecution.stop();
}

public void fail(Throwable t) {
currentExecution.fail(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -238,7 +237,6 @@ public Configuration getConfiguration() {
public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
Preconditions.checkNotNull(invokable);
this.invokableClassName = invokable.getName();
this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
}

/**
Expand Down
Loading

0 comments on commit e95b347

Please sign in to comment.