diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 91a722eea8c7c..269462aac37cf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -56,9 +56,9 @@ import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; +import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; +import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; -import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; -import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; @@ -389,11 +389,11 @@ public CompletableFuture submitJob(@Nonnull JobGraph jobGra @Override public void cancel(JobID jobID) throws Exception { - JobTerminationMessageParameters params = new JobTerminationMessageParameters(); + JobCancellationMessageParameters params = new JobCancellationMessageParameters(); params.jobPathParameter.resolve(jobID); params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); CompletableFuture responseFuture = sendRequest( - JobTerminationHeaders.getInstance(), + JobCancellationHeaders.getInstance(), params); responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 76cce2aa96418..46021506e9779 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -56,15 +56,14 @@ import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; +import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; +import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; import org.apache.flink.runtime.rest.messages.JobMessageParameters; -import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; -import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; -import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; @@ -214,9 +213,9 @@ private RestClient createRestClient() throws ConfigurationException { } @Test - public void testJobSubmitCancelStop() throws Exception { + public void testJobSubmitCancel() throws Exception { TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); - TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); + TestJobCancellationHandler terminationHandler = new TestJobCancellationHandler(); TestJobExecutionResultHandler testJobExecutionResultHandler = new TestJobExecutionResultHandler( JobExecutionResultResponseBody.created(new JobResult.Builder() @@ -286,24 +285,16 @@ protected CompletableFuture handleRequest(@Nonnull Handle } } - private class TestJobTerminationHandler extends TestHandler { + private class TestJobCancellationHandler extends TestHandler { private volatile boolean jobCanceled = false; - private volatile boolean jobStopped = false; - private TestJobTerminationHandler() { - super(JobTerminationHeaders.getInstance()); + private TestJobCancellationHandler() { + super(JobCancellationHeaders.getInstance()); } @Override - protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - switch (request.getQueryParameter(TerminationModeQueryParameter.class).get(0)) { - case CANCEL: - jobCanceled = true; - break; - case STOP: - jobStopped = true; - break; - } + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + jobCanceled = true; return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); } } diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java index ba41169ce055b..5c63c8f551b70 100644 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java +++ b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.nifi; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -40,7 +39,7 @@ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. */ -public class NiFiSource extends RichParallelSourceFunction implements StoppableFunction{ +public class NiFiSource extends RichParallelSourceFunction { private static final long serialVersionUID = 1L; @@ -147,9 +146,4 @@ public void close() throws Exception { super.close(); client.close(); } - - @Override - public void stop() { - this.isRunning = false; - } } diff --git a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java index b3b04a6c0b4ff..0f58f5701d9f5 100644 --- a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.twitter; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -46,7 +45,7 @@ * Twitter. This is not a parallel source because the Twitter API only allows * two concurrent connections. */ -public class TwitterSource extends RichSourceFunction implements StoppableFunction { +public class TwitterSource extends RichSourceFunction { private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class); @@ -183,12 +182,6 @@ public void cancel() { close(); } - @Override - public void stop() { - LOG.info("Stopping Twitter source"); - close(); - } - // ------ Custom endpoints /** diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java deleted file mode 100644 index 07ef372140e93..0000000000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.flink.api.common.functions; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * Must be implemented by stoppable functions, eg, source functions of streaming jobs. The method {@link #stop()} will - * be called when the job received the STOP signal. On this signal, the source function must stop emitting new data and - * terminate gracefully. - */ -@PublicEvolving -public interface StoppableFunction { - /** - * Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down - * gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near - * future. The job will keep running until all emitted data is processed completely. - * - *

Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source - * will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the - * loop and that is set to false in this method. - * - *

The call to {@code stop()} should not block and not throw any exception. - */ - void stop(); -} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 7a287cf85be78..cf4cdebfbc89b 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -252,7 +251,7 @@ public void getConfiguration() { } @Test - public void testStop() throws Exception { + public void testCancel() throws Exception { // this only works if there is no active job at this point assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty()); @@ -280,8 +279,8 @@ public void testStop() throws Exception { final Deadline deadline = testTimeout.fromNow(); try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) { - // stop the job - client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); + // cancel the job + client.sendPatchRequest("/jobs/" + jid + "/", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); @@ -311,7 +310,7 @@ public void testStop() throws Exception { } @Test - public void testStopYarn() throws Exception { + public void testCancelYarn() throws Exception { // this only works if there is no active job at this point assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty()); @@ -340,7 +339,7 @@ public void testStopYarn() throws Exception { try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) { // Request the file from the web server - client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); + client.sendGetRequest("/jobs/" + jid + "/yarn-cancel", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client .getNextResponse(deadline.timeLeft()); @@ -367,9 +366,9 @@ private static List getRunningJobs(ClusterClient client) throws Except } /** - * Test invokable that is stoppable and allows waiting for all subtasks to be running. + * Test invokable that allows waiting for all subtasks to be running. */ - public static class BlockingInvokable extends AbstractInvokable implements StoppableTask { + public static class BlockingInvokable extends AbstractInvokable { private static CountDownLatch latch = new CountDownLatch(2); @@ -388,7 +387,7 @@ public void invoke() throws Exception { } @Override - public void stop() { + public void cancel() { this.isRunning = false; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java deleted file mode 100644 index 3644219abfcef..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime; - -import org.apache.flink.util.FlinkException; - -/** - * Indicates that a job is not stoppable. - */ -public class StoppingException extends FlinkException { - - private static final long serialVersionUID = -721315728140810694L; - - public StoppingException(String msg) { - super(msg); - } - - public StoppingException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index de1a3b86a9079..3c4028c0173a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -422,13 +422,6 @@ public CompletableFuture cancelJob(JobID jobId, Time timeout) { return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout)); } - @Override - public CompletableFuture stopJob(JobID jobId, Time timeout) { - final CompletableFuture jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); - - return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.stop(timeout)); - } - @Override public CompletableFuture rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { final CompletableFuture jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index a2f9ea9f41211..12908366e866e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -667,29 +667,6 @@ public void deploy() throws JobException { } } - /** - * Sends stop RPC call. - */ - public void stop() { - assertRunningInJobMasterMainThread(); - final LogicalSlot slot = assignedResource; - - if (slot != null) { - final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); - - CompletableFuture stopResultFuture = FutureUtils.retry( - () -> taskManagerGateway.stopTask(attemptId, rpcTimeout), - NUM_STOP_CALL_TRIES, - vertex.getExecutionGraph().getJobMasterMainThreadExecutor()); - - stopResultFuture.exceptionally( - failure -> { - LOG.info("Stopping task was not successful.", failure); - return null; - }); - } - } - public void cancel() { // depending on the previous state, we go directly to cancelled (no cancel call necessary) // -- or to canceling (cancel call needs to be sent to the task manager) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 56e31a3f1dde6..1b839d61fd1e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobWriter; @@ -1093,21 +1092,6 @@ else if (current == JobStatus.RESTARTING) { } } - public void stop() throws StoppingException { - - assertRunningInJobMasterMainThread(); - - if (isStoppable) { - for (ExecutionVertex ev : this.getAllExecutionVertices()) { - if (ev.getNumberOfInputs() == 0) { // send signal to sources only - ev.stop(); - } - } - } else { - throw new StoppingException("This job is not stoppable."); - } - } - /** * Suspends the current ExecutionGraph. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index fc2a0f0721392..8283f57bbefb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -692,10 +692,6 @@ public CompletableFuture suspend() { return currentExecution.suspend(); } - public void stop() { - currentExecution.stop(); - } - public void fail(Throwable t) { currentExecution.fail(t); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index c78d707029b5a..6da61cf4c9e7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -25,7 +25,6 @@ import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.util.Preconditions; @@ -238,7 +237,6 @@ public Configuration getConfiguration() { public void setInvokableClass(Class invokable) { Preconditions.checkNotNull(invokable); this.invokableClassName = invokable.getName(); - this.isStoppable = StoppableTask.class.isAssignableFrom(invokable); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java deleted file mode 100644 index 1b2d2a7c78fcb..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.jobgraph.tasks; - -/** - * Implemented by tasks that can receive STOP signal. - */ -public interface StoppableTask { - /** Called on STOP signal. */ - void stop(); -} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java index 40ece6809c82f..005ee044d5a89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java @@ -105,19 +105,6 @@ public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, T return FutureUtils.toJava(submitResult); } - @Override - public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future stopResult = actorGateway.ask( - new TaskMessages.StopTask(executionAttemptID), - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); - - return FutureUtils.toJava(stopResult); - } - @Override public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { Preconditions.checkNotNull(executionAttemptID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index e07d49e713433..6922b05d684b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -73,17 +73,6 @@ CompletableFuture submitTask( TaskDeploymentDescriptor tdd, Time timeout); - /** - * Stop the given task. - * - * @param executionAttemptID identifying the task - * @param timeout of the submit operation - * @return Future acknowledge if the task is successfully stopped - */ - CompletableFuture stopTask( - ExecutionAttemptID executionAttemptID, - Time timeout); - /** * Cancel the given task. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index ff2287300c640..6d29a9eb81d24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -28,7 +28,6 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -388,17 +387,6 @@ public CompletableFuture cancel(Time timeout) { return CompletableFuture.completedFuture(Acknowledge.get()); } - @Override - public CompletableFuture stop(Time timeout) { - try { - executionGraph.stop(); - } catch (StoppingException e) { - return FutureUtils.completedExceptionally(e); - } - - return CompletableFuture.completedFuture(Acknowledge.get()); - } - @Override public CompletableFuture rescaleJob( int newParallelism, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 7ed329991eaee..b15d7040973a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -65,14 +65,6 @@ public interface JobMasterGateway extends */ CompletableFuture cancel(@RpcTimeout Time timeout); - /** - * Cancel the currently executed job. - * - * @param timeout of this operation - * @return Future acknowledge if the cancellation was successful - */ - CompletableFuture stop(@RpcTimeout Time timeout); - /** * Triggers rescaling of the executed job. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 090321dad017d..064eef56143de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -75,11 +75,6 @@ public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, T return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout); } - @Override - public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { - return taskExecutorGateway.stopTask(executionAttemptID, timeout); - } - @Override public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { return taskExecutorGateway.cancelTask(executionAttemptID, timeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 96175c02309a4..1ddb89ce16acc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -551,10 +551,6 @@ public CompletableFuture cancelJob(JobID jobId) { return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.cancelJob(jobId, rpcTimeout)); } - public CompletableFuture stopJob(JobID jobId) { - return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.stopJob(jobId, rpcTimeout)); - } - public CompletableFuture triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob) { return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.triggerSavepoint(jobId, targetDirectory, cancelJob, rpcTimeout)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java similarity index 85% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java index 8fb66fd7005c8..62f20a2343ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobCancellationHandler.java @@ -28,8 +28,8 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -48,15 +48,15 @@ /** * Request handler for the cancel and stop request. */ -public class JobTerminationHandler extends AbstractRestHandler { +public class JobCancellationHandler extends AbstractRestHandler { private final TerminationModeQueryParameter.TerminationMode defaultTerminationMode; - public JobTerminationHandler( + public JobCancellationHandler( GatewayRetriever leaderRetriever, Time timeout, Map headers, - MessageHeaders messageHeaders, + MessageHeaders messageHeaders, TerminationModeQueryParameter.TerminationMode defaultTerminationMode) { super(leaderRetriever, timeout, headers, messageHeaders); @@ -64,7 +64,7 @@ public JobTerminationHandler( } @Override - public CompletableFuture handleRequest(HandlerRequest request, RestfulGateway gateway) { + public CompletableFuture handleRequest(HandlerRequest request, RestfulGateway gateway) throws RestHandlerException { final JobID jobId = request.getPathParameter(JobIDPathParameter.class); final List terminationModes = request.getQueryParameter(TerminationModeQueryParameter.class); final TerminationModeQueryParameter.TerminationMode terminationMode; @@ -83,8 +83,7 @@ public CompletableFuture handleRequest(HandlerRequest handleRequest(HandlerRequest handleRequest(HandlerRequest { +public class JobCancellationHeaders implements MessageHeaders { public static final String URL = "/jobs/:jobid"; - private static final JobTerminationHeaders INSTANCE = new JobTerminationHeaders(); + private static final JobCancellationHeaders INSTANCE = new JobCancellationHeaders(); - private JobTerminationHeaders() {} + private JobCancellationHeaders() {} @Override public Class getRequestClass() { @@ -50,8 +50,8 @@ public HttpResponseStatus getResponseStatusCode() { } @Override - public JobTerminationMessageParameters getUnresolvedMessageParameters() { - return new JobTerminationMessageParameters(); + public JobCancellationMessageParameters getUnresolvedMessageParameters() { + return new JobCancellationMessageParameters(); } @Override @@ -64,7 +64,7 @@ public String getTargetRestEndpointURL() { return URL; } - public static JobTerminationHeaders getInstance() { + public static JobCancellationHeaders getInstance() { return INSTANCE; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java similarity index 95% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java index a59dc83ab38ac..749f3c5203dc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java @@ -26,7 +26,7 @@ * *

A job related REST handler always requires a {@link JobIDPathParameter}. */ -public class JobTerminationMessageParameters extends MessageParameters { +public class JobCancellationMessageParameters extends MessageParameters { public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TerminationModeQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TerminationModeQueryParameter.java index ead0f0c5f334d..97dadfe87711e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TerminationModeQueryParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TerminationModeQueryParameter.java @@ -18,11 +18,11 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.util.StringUtils; /** - * Termination mode for the {@link JobTerminationHandler}. + * Termination mode. + * @deprecated Only kept to detect legacy usages of the cancel/stop command. Please use the "stop-with-savepoint" command instead. */ public class TerminationModeQueryParameter extends MessageQueryParameter { @@ -49,7 +49,7 @@ public String getDescription() { } /** - * Supported termination modes. + * @deprecated Please use the "stop-with-savepoint" command instead. */ public enum TerminationMode { CANCEL, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java index 53339600b262d..1fc86d7353f7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java @@ -20,10 +20,10 @@ import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; -import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; +import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler; /** - * {@link RestHandlerSpecification} for the {@link JobTerminationHandler} which is registered for + * {@link RestHandlerSpecification} for the {@link JobCancellationHandler} which is registered for * compatibility with the Yarn proxy as a GET call. * *

For more information @see YARN-2031. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java index bbd36c0795311..0eedf59e2fa16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java @@ -20,10 +20,10 @@ import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; -import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; +import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler; /** - * {@link RestHandlerSpecification} for the {@link JobTerminationHandler} which is registered for + * {@link RestHandlerSpecification} for the {@link JobCancellationHandler} which is registered for * compatibility with the Yarn proxy as a GET call. * *

For more information @see YARN-2031. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 0731d40bd04c0..9b9ad5b5be5a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -600,25 +600,6 @@ public CompletableFuture cancelTask(ExecutionAttemptID executionAtt } } - @Override - public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { - final Task task = taskSlotTable.getTask(executionAttemptID); - - if (task != null) { - try { - task.stopExecution(); - return CompletableFuture.completedFuture(Acknowledge.get()); - } catch (Throwable t) { - return FutureUtils.completedExceptionally(new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t)); - } - } else { - final String message = "Cannot find task to stop for execution " + executionAttemptID + '.'; - - log.debug(message); - return FutureUtils.completedExceptionally(new TaskException(message)); - } - } - // ---------------------------------------------------------------------- // Partition lifecycle RPCs // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index a3e9c0d7a308c..728087af63f98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -135,15 +135,6 @@ CompletableFuture triggerCheckpoint( */ CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp); - /** - * Stop the given task. - * - * @param executionAttemptID identifying the task - * @param timeout for the stop operation - * @return Future acknowledge if the task is successfully stopped - */ - CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout); - /** * Cancel the given task. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 39751740f03b5..82d2a304fad62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -62,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -935,51 +934,9 @@ private boolean transitionState(ExecutionState currentState, ExecutionState newS } // ---------------------------------------------------------------------------------------------------------------- - // Stopping / Canceling / Failing the task from the outside + // Canceling / Failing the task from the outside // ---------------------------------------------------------------------------------------------------------------- - /** - * Stops the executing task by calling {@link StoppableTask#stop()}. - *

- * This method never blocks. - *

- * - * @throws UnsupportedOperationException if the {@link AbstractInvokable} does not implement {@link StoppableTask} - * @throws IllegalStateException if the {@link Task} is not yet running - */ - public void stopExecution() { - // copy reference to stack, to guard against concurrent setting to null - final AbstractInvokable invokable = this.invokable; - - if (invokable != null) { - if (invokable instanceof StoppableTask) { - LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId); - final StoppableTask stoppable = (StoppableTask) invokable; - - Runnable runnable = () -> { - try { - stoppable.stop(); - } catch (Throwable t) { - LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, t); - taskManagerActions.failTask(executionId, t); - } - }; - executeAsyncCallRunnable( - runnable, - String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId), - false); - } else { - throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId)); - } - } else { - throw new IllegalStateException( - String.format( - "Cannot stop task %s (%s) because it is not running.", - taskNameWithSubtask, - executionId)); - } - } - /** * Cancels the task execution. If the task is already in a terminal state * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index dd5d6463ce83e..dcfbf6bba53f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -57,15 +57,6 @@ public interface RestfulGateway extends RpcGateway { */ CompletableFuture cancelJob(JobID jobId, @RpcTimeout Time timeout); - /** - * Stop the given job. - * - * @param jobId identifying the job to stop - * @param timeout of the operation - * @return A future acknowledge if the stopping succeeded - */ - CompletableFuture stopJob(JobID jobId, @RpcTimeout Time timeout); - /** * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph, then * the future is completed with a {@link FlinkJobNotFoundException}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index f9b5ea91f144f..68b9c7585e3fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -37,13 +37,13 @@ import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler; import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler; import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler; import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; -import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler; @@ -85,11 +85,11 @@ import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobCancellationHeaders; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; -import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders; @@ -483,18 +483,19 @@ protected List> initiali responseHeaders, JobVertexBackPressureHeaders.getInstance()); - final JobTerminationHandler jobCancelTerminationHandler = new JobTerminationHandler( + final JobCancellationHandler jobCancelTerminationHandler = new JobCancellationHandler( leaderRetriever, timeout, responseHeaders, - JobTerminationHeaders.getInstance(), + JobCancellationHeaders.getInstance(), TerminationModeQueryParameter.TerminationMode.CANCEL); - final JobTerminationHandler jobStopTerminationHandler = new JobTerminationHandler( + // this is kept just for legacy reasons. STOP has been replaced by STOP-WITH-SAVEPOINT. + final JobCancellationHandler jobStopTerminationHandler = new JobCancellationHandler( leaderRetriever, timeout, responseHeaders, - JobTerminationHeaders.getInstance(), + JobCancellationHeaders.getInstance(), TerminationModeQueryParameter.TerminationMode.STOP); final JobVertexDetailsHandler jobVertexDetailsHandler = new JobVertexDetailsHandler( diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 12ad2ee53aeb8..0206872cabf77 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -126,14 +126,6 @@ object JobManagerMessages { savepointDirectory: String = null) extends RequiresLeaderSessionID - /** - * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of - * stopping is sent back to the sender as a [[StoppingResponse]] message. - * - * @param jobID - */ - case class StopJob(jobID: JobID) extends RequiresLeaderSessionID - /** * Requesting next input split for the * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]] @@ -305,23 +297,6 @@ object JobManagerMessages { */ case class CancellationFailure(jobID: JobID, cause: Throwable) extends CancellationResponse - sealed trait StoppingResponse { - def jobID: JobID - } - - /** - * Denotes a successful (streaming) job stopping - * @param jobID - */ - case class StoppingSuccess(jobID: JobID) extends StoppingResponse - - /** - * Denotes a failed (streaming) job stopping - * @param jobID - * @param cause - */ - case class StoppingFailure(jobID: JobID, cause: Throwable) extends StoppingResponse - /** * Requests all currently running jobs from the job manager. This message triggers a * [[RunningJobs]] response. diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala index 6c0da2bda7620..d9846182598d1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala @@ -58,15 +58,6 @@ object TaskMessages { case class CancelTask(attemptID: ExecutionAttemptID) extends TaskMessage with RequiresLeaderSessionID - /** - * Stops the task associated with [[attemptID]]. The result is sent back to the sender as a - * [[TaskOperationResult]] message. - * - * @param attemptID The task's execution attempt ID. - */ - case class StopTask(attemptID: ExecutionAttemptID) - extends TaskMessage with RequiresLeaderSessionID - /** * Triggers a fail of specified task from the outside (as opposed to the task throwing * an exception itself) with the given exception as the cause. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java deleted file mode 100644 index 71febc23968f5..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.StoppingException; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.runtime.testutils.StoppableInvokable; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Validates that stop() calls are handled correctly. - */ -public class ExecutionGraphStopTest extends TestLogger { - - /** - * Tests that STOP is only supported if all sources are stoppable. - */ - @Test - public void testStopIfSourcesNotStoppable() throws Exception { - final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph(); - - try { - graph.stop(); - fail("exception expected"); - } - catch (StoppingException e) { - // expected - } - } - - /** - * Validates that stop is only sent to the sources. - * - *

This test build a simple job with two sources and two non-source vertices. - */ - @Test - public void testStop() throws Exception { - final int sourcePar1 = 11; - final int sourcePar2 = 7; - - final JobVertex source1 = new JobVertex("source 1"); - source1.setInvokableClass(StoppableInvokable.class); - source1.setParallelism(sourcePar1); - - final JobVertex source2 = new JobVertex("source 2"); - source2.setInvokableClass(StoppableInvokable.class); - source2.setParallelism(sourcePar2); - - final JobVertex nonSource1 = new JobVertex("non-source-1"); - nonSource1.setInvokableClass(NoOpInvokable.class); - nonSource1.setParallelism(10); - - final JobVertex nonSource2 = new JobVertex("non-source-2"); - nonSource2.setInvokableClass(NoOpInvokable.class); - nonSource2.setParallelism(10); - - nonSource1.connectNewDataSetAsInput(source1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); - nonSource1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - nonSource2.connectNewDataSetAsInput(nonSource1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - - final JobID jid = new JobID(); - final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph( - jid, source1, source2, nonSource1, nonSource2); - - // we use different gateways for sources and non-sources to make sure the right ones - // get the RPC calls - final TaskManagerGateway sourceGateway = spy(new SimpleAckingTaskManagerGateway()); - final TaskManagerGateway nonSourceGateway = spy(new SimpleAckingTaskManagerGateway()); - - // deploy source 1 - for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway); - ev.getCurrentExecutionAttempt().tryAssignResource(slot); - ev.getCurrentExecutionAttempt().deploy(); - } - - // deploy source 2 - for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway); - ev.getCurrentExecutionAttempt().tryAssignResource(slot); - ev.getCurrentExecutionAttempt().deploy(); - } - - // deploy non-source 1 - for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway); - ev.getCurrentExecutionAttempt().tryAssignResource(slot); - ev.getCurrentExecutionAttempt().deploy(); - } - - // deploy non-source 2 - for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway); - ev.getCurrentExecutionAttempt().tryAssignResource(slot); - ev.getCurrentExecutionAttempt().deploy(); - } - - eg.stop(); - - verify(sourceGateway, timeout(1000).times(sourcePar1 + sourcePar2)).stopTask(any(ExecutionAttemptID.class), any(Time.class)); - verify(nonSourceGateway, times(0)).stopTask(any(ExecutionAttemptID.class), any(Time.class)); - - ExecutionGraphTestUtils.finishAllVertices(eg); - } - - /** - * Tests that the stopping RPC call is sent upon stopping requests. - */ - @Test - public void testStopRpc() throws Exception { - final JobID jid = new JobID(); - final JobVertex vertex = new JobVertex("vertex"); - vertex.setInvokableClass(NoOpInvokable.class); - vertex.setParallelism(5); - - final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph(jid, vertex); - final Execution exec = graph.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt(); - - final TaskManagerGateway gateway = mock(TaskManagerGateway.class); - when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); - when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); - - final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway); - - exec.tryAssignResource(slot); - exec.deploy(); - exec.switchToRunning(); - assertEquals(ExecutionState.RUNNING, exec.getState()); - - exec.stop(); - assertEquals(ExecutionState.RUNNING, exec.getState()); - - verify(gateway, times(1)).stopTask(any(ExecutionAttemptID.class), any(Time.class)); - - exec.markFinished(); - assertEquals(ExecutionState.FINISHED, exec.getState()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index b6bf9dc6ae696..22d5df0610d06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -83,11 +83,6 @@ public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, T return CompletableFuture.completedFuture(Acknowledge.get()); } - @Override - public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { - return CompletableFuture.completedFuture(Acknowledge.get()); - } - @Override public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { cancelConsumer.accept(executionAttemptID); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 4f2e38164c2e3..13ffe5265bb4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -81,9 +81,6 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull private final Supplier> cancelFunction; - @Nonnull - private final Supplier> stopFunction; - @Nonnull private final BiFunction> rescalingJobFunction; @@ -169,7 +166,6 @@ public TestingJobMasterGateway( @Nonnull String address, @Nonnull String hostname, @Nonnull Supplier> cancelFunction, - @Nonnull Supplier> stopFunction, @Nonnull BiFunction> rescalingJobFunction, @Nonnull TriFunction, Integer, RescalingBehaviour, CompletableFuture> rescalingOperatorsFunction, @Nonnull Function> updateTaskExecutionStateFunction, @@ -200,7 +196,6 @@ public TestingJobMasterGateway( this.address = address; this.hostname = hostname; this.cancelFunction = cancelFunction; - this.stopFunction = stopFunction; this.rescalingJobFunction = rescalingJobFunction; this.rescalingOperatorsFunction = rescalingOperatorsFunction; this.updateTaskExecutionStateFunction = updateTaskExecutionStateFunction; @@ -235,11 +230,6 @@ public CompletableFuture cancel(Time timeout) { return cancelFunction.get(); } - @Override - public CompletableFuture stop(Time timeout) { - return stopFunction.get(); - } - @Override public CompletableFuture rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { return rescalingJobFunction.apply(newParallelism, rescalingBehaviour); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index 955cb2aa01815..c13cbf66f6e2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -76,7 +76,6 @@ public class TestingJobMasterGatewayBuilder { private String address = "akka.tcp://flink@localhost:6130/user/jobmanager"; private String hostname = "localhost"; private Supplier> cancelFunction = () -> CompletableFuture.completedFuture(Acknowledge.get()); - private Supplier> stopFunction = () -> CompletableFuture.completedFuture(Acknowledge.get()); private BiFunction> rescalingJobFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get()); private TriFunction, Integer, RescalingBehaviour, CompletableFuture> rescalingOperatorsFunction = (ignoredA, ignoredB, ignoredC) -> CompletableFuture.completedFuture(Acknowledge.get()); private Function> updateTaskExecutionStateFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); @@ -120,11 +119,6 @@ public TestingJobMasterGatewayBuilder setCancelFunction(Supplier> stopFunction) { - this.stopFunction = stopFunction; - return this; - } - public TestingJobMasterGatewayBuilder setRescalingJobFunction(BiFunction> rescalingJobFunction) { this.rescalingJobFunction = rescalingJobFunction; return this; @@ -261,6 +255,6 @@ public TestingJobMasterGatewayBuilder setUpdateAggregateFunction(TriFunction task1RunningFuture = new CompletableFuture<>(); - final CompletableFuture task2RunningFuture = new CompletableFuture<>(); - final CompletableFuture task1FinishedFuture = new CompletableFuture<>(); - - try (TaskSubmissionTestEnvironment env = - new TaskSubmissionTestEnvironment.Builder(jobId) - .setSlotSize(2) - .addTaskManagerActionListener(eid1, ExecutionState.RUNNING, task1RunningFuture) - .addTaskManagerActionListener(eid2, ExecutionState.RUNNING, task2RunningFuture) - .addTaskManagerActionListener(eid1, ExecutionState.FINISHED, task1FinishedFuture) - .build()) { - TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); - TaskSlotTable taskSlotTable = env.getTaskSlotTable(); - - taskSlotTable.allocateSlot(0, jobId, tdd1.getAllocationId(), Time.seconds(60)); - tmGateway.submitTask(tdd1, env.getJobMasterId(), timeout).get(); - task1RunningFuture.get(); - - taskSlotTable.allocateSlot(1, jobId, tdd2.getAllocationId(), Time.seconds(60)); - tmGateway.submitTask(tdd2, env.getJobMasterId(), timeout).get(); - task2RunningFuture.get(); - - assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.RUNNING); - assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.RUNNING); - - tmGateway.stopTask(eid1, timeout); - task1FinishedFuture.get(); - - // task 2 does not implement StoppableTask which should cause the stop operation to fail - CompletableFuture acknowledgeOfTask2 = tmGateway.stopTask(eid2, timeout); - boolean hasTaskException = false; - try { - acknowledgeOfTask2.get(); - } catch (Throwable e) { - hasTaskException = ExceptionUtils.findThrowable(e, TaskException.class).isPresent(); - } - - assertTrue(hasTaskException); - assertSame(taskSlotTable.getTask(eid1).getExecutionState(), ExecutionState.FINISHED); - assertSame(taskSlotTable.getTask(eid2).getExecutionState(), ExecutionState.RUNNING); - } - } - - /** - * Tests that the TaskManager sends a proper exception back to the sender if the stop task - * message fails. - */ - @Test(timeout = 10000L) - public void testStopTaskFailure() throws Exception { - final ExecutionAttemptID eid = new ExecutionAttemptID(); - - final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, BlockingNoOpInvokable.class); - - final CompletableFuture taskRunningFuture = new CompletableFuture<>(); - - try (TaskSubmissionTestEnvironment env = - new TaskSubmissionTestEnvironment.Builder(jobId) - .setSlotSize(1) - .addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture) - .build()) { - TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); - TaskSlotTable taskSlotTable = env.getTaskSlotTable(); - - taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60)); - tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get(); - taskRunningFuture.get(); - - CompletableFuture stopFuture = tmGateway.stopTask(eid, timeout); - try { - stopFuture.get(); - } catch (Exception e) { - assertTrue(e.getCause() instanceof TaskException); - assertThat(e.getCause().getMessage(), startsWith("Cannot stop task for execution")); - } - } - } - /** * Tests that submitted tasks will fail when attempting to send/receive data if no * ResultPartitions/InputGates are set up. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 953c38599bddb..789956f2bb6fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -137,11 +137,6 @@ public CompletableFuture confirmCheckpoint(ExecutionAttemptID execu return CompletableFuture.completedFuture(Acknowledge.get()); } - @Override - public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { - return CompletableFuture.completedFuture(Acknowledge.get()); - } - @Override public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { return cancelTaskFunction.apply(executionAttemptID); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 3e1d16676fd9d..575ddad412547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -70,7 +69,6 @@ import java.util.List; import java.util.concurrent.Executor; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -78,7 +76,6 @@ import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -103,7 +100,7 @@ public class TaskAsyncCallTest extends TestLogger { */ private static OneShotLatch notifyCheckpointCompleteLatch; - /** Triggered on {@link ContextClassLoaderInterceptingInvokable#stop()}}. */ + /** Triggered on {@link ContextClassLoaderInterceptingInvokable#cancel()}. */ private static OneShotLatch stopLatch; private static final List classLoaders = Collections.synchronizedList(new ArrayList<>()); @@ -178,26 +175,10 @@ public void testMixedAsyncCallsInOrder() throws Exception { } } - @Test - public void testThrowExceptionIfStopInvokedWithNotStoppableTask() throws Exception { - Task task = createTask(CheckpointsInOrderInvokable.class); - try (TaskCleaner ignored = new TaskCleaner(task)) { - task.startTaskThread(); - awaitLatch.await(); - - try { - task.stopExecution(); - fail("Expected exception not thrown"); - } catch (UnsupportedOperationException e) { - assertThat(e.getMessage(), containsString("Stopping not supported by task")); - } - } - } - /** * Asserts that {@link AbstractInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions, boolean)}, - * {@link AbstractInvokable#notifyCheckpointComplete(long)}, and {@link StoppableTask#stop()} are - * invoked by a thread whose context class loader is set to the user code class loader. + * and {@link AbstractInvokable#notifyCheckpointComplete(long)} are invoked by a thread whose context + * class loader is set to the user code class loader. */ @Test public void testSetsUserCodeClassLoader() throws Exception { @@ -213,12 +194,12 @@ public void testSetsUserCodeClassLoader() throws Exception { triggerLatch.await(); task.notifyCheckpointComplete(1); - task.stopExecution(); + task.cancelExecution(); notifyCheckpointCompleteLatch.await(); stopLatch.await(); - assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3))); + assertThat(classLoaders, hasSize(greaterThanOrEqualTo(2))); assertThat(classLoaders, everyItem(instanceOf(TestUserCodeClassLoader.class))); } } @@ -364,7 +345,7 @@ public void notifyCheckpointComplete(long checkpointId) { * * @see #testSetsUserCodeClassLoader() */ - public static class ContextClassLoaderInterceptingInvokable extends CheckpointsInOrderInvokable implements StoppableTask { + public static class ContextClassLoaderInterceptingInvokable extends CheckpointsInOrderInvokable { public ContextClassLoaderInterceptingInvokable(Environment environment) { super(environment); @@ -385,8 +366,7 @@ public void notifyCheckpointComplete(long checkpointId) { } @Override - public void stop() { - classLoaders.add(Thread.currentThread().getContextClassLoader()); + public void cancel() { stopLatch.trigger(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java deleted file mode 100644 index ba8f7f657351b..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.flink.runtime.testutils; - -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; - -public final class StoppableInvokable extends AbstractInvokable implements StoppableTask { - - private boolean isRunning = true; - - public StoppableInvokable(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - while (isRunning) { - Thread.sleep(100); - } - } - - @Override - public void stop() { - this.isRunning = false; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java index 6f5ec7392feec..24c79f3c4e945 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java @@ -72,7 +72,6 @@ public TestingDispatcherGateway( String address, String hostname, Function> cancelJobFunction, - Function> stopJobFunction, Function> requestJobFunction, Function> requestJobResultFunction, Function> requestJobStatusFunction, @@ -92,7 +91,6 @@ public TestingDispatcherGateway( address, hostname, cancelJobFunction, - stopJobFunction, requestJobFunction, requestJobResultFunction, requestJobStatusFunction, @@ -181,7 +179,6 @@ public TestingDispatcherGateway build() { address, hostname, cancelJobFunction, - stopJobFunction, requestJobFunction, requestJobResultFunction, requestJobStatusFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index 32196a5fc01ef..1a3c6d78a1868 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -46,7 +46,6 @@ public class TestingRestfulGateway implements RestfulGateway { static final Function> DEFAULT_CANCEL_JOB_FUNCTION = jobId -> CompletableFuture.completedFuture(Acknowledge.get()); - static final Function> DEFAULT_STOP_JOB_FUNCTION = jobId -> CompletableFuture.completedFuture(Acknowledge.get()); static final Function> DEFAULT_REQUEST_JOB_RESULT_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final Function> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final Function> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); @@ -67,8 +66,6 @@ public class TestingRestfulGateway implements RestfulGateway { protected Function> cancelJobFunction; - protected Function> stopJobFunction; - protected Function> requestJobFunction; protected Function> requestJobResultFunction; @@ -94,7 +91,6 @@ public TestingRestfulGateway() { LOCALHOST, LOCALHOST, DEFAULT_CANCEL_JOB_FUNCTION, - DEFAULT_STOP_JOB_FUNCTION, DEFAULT_REQUEST_JOB_FUNCTION, DEFAULT_REQUEST_JOB_RESULT_FUNCTION, DEFAULT_REQUEST_JOB_STATUS_FUNCTION, @@ -111,7 +107,6 @@ public TestingRestfulGateway( String address, String hostname, Function> cancelJobFunction, - Function> stopJobFunction, Function> requestJobFunction, Function> requestJobResultFunction, Function> requestJobStatusFunction, @@ -125,7 +120,6 @@ public TestingRestfulGateway( this.address = address; this.hostname = hostname; this.cancelJobFunction = cancelJobFunction; - this.stopJobFunction = stopJobFunction; this.requestJobFunction = requestJobFunction; this.requestJobResultFunction = requestJobResultFunction; this.requestJobStatusFunction = requestJobStatusFunction; @@ -143,11 +137,6 @@ public CompletableFuture cancelJob(JobID jobId, Time timeout) { return cancelJobFunction.apply(jobId); } - @Override - public CompletableFuture stopJob(JobID jobId, Time timeout) { - return stopJobFunction.apply(jobId); - } - @Override public CompletableFuture requestJob(JobID jobId, Time timeout) { return requestJobFunction.apply(jobId); @@ -219,7 +208,6 @@ public static class Builder { protected String address = LOCALHOST; protected String hostname = LOCALHOST; protected Function> cancelJobFunction; - protected Function> stopJobFunction; protected Function> requestJobFunction; protected Function> requestJobResultFunction; protected Function> requestJobStatusFunction; @@ -234,7 +222,6 @@ public static class Builder { public Builder() { cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION; - stopJobFunction = DEFAULT_STOP_JOB_FUNCTION; requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION; requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION; requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION; @@ -302,11 +289,6 @@ public Builder setCancelJobFunction(Function> stopJobFunction) { - this.stopJobFunction = stopJobFunction; - return this; - } - public Builder setTriggerSavepointFunction(BiFunction> triggerSavepointFunction) { this.triggerSavepointFunction = triggerSavepointFunction; return this; @@ -322,7 +304,6 @@ public TestingRestfulGateway build() { address, hostname, cancelJobFunction, - stopJobFunction, requestJobFunction, requestJobResultFunction, requestJobStatusFunction, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b4b835379bc71..7ac1ac6898501 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.io.InputFormat; @@ -72,7 +71,6 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.util.Preconditions; @@ -1468,32 +1466,11 @@ public DataStreamSource addSource(SourceFunction function, Strin boolean isParallel = function instanceof ParallelSourceFunction; clean(function); - StreamSource sourceOperator; - if (function instanceof StoppableFunction) { - sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); - } else { - sourceOperator = new StreamSource<>(function); - } + final StreamSource sourceOperator = new StreamSource<>(function); return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } - /** - * Casts the source function into a SourceFunction implementing the StoppableFunction. - * - *

This method should only be used if the source function was checked to implement the - * {@link StoppableFunction} interface. - * - * @param sourceFunction Source function to cast - * @param Output type of source function - * @param Union type of SourceFunction and StoppableFunction - * @return The casted source function so that it's type implements the StoppableFunction - */ - @SuppressWarnings("unchecked") - private & StoppableFunction> T cast2StoppableSourceFunction(SourceFunction sourceFunction) { - return (T) sourceFunction; - } - /** * Triggers the program execution. The environment will execute all parts of * the program that have resulted in a "sink" operation. Sink operations are diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 5a15df7d7a44a..8793fe54dc149 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -92,18 +92,8 @@ * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), * the watermarks from the source function are ignored. * - *

Gracefully Stopping Functions

- * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} - * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the - * state and the emitted elements in a consistent state. - * - *

When a source is stopped, the executing thread is not interrupted, but expected to leave the - * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity - * of state updates and element emission. - * * @param The type of the elements produced by this source. * - * @see org.apache.flink.api.common.functions.StoppableFunction * @see org.apache.flink.streaming.api.TimeCharacteristic */ @Public diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index f4950ecea7d86..ff94102b1f089 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -45,7 +44,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; -import org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; @@ -195,9 +193,7 @@ public void addOperator( TypeInformation outTypeInfo, String operatorName) { - if (operatorObject instanceof StoppableStreamSource) { - addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName); - } else if (operatorObject instanceof StreamSource) { + if (operatorObject instanceof StreamSource) { addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName); } else { addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java deleted file mode 100644 index ce8f6cd066233..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -/** - * {@link StoppableStreamSource} takes a {@link SourceFunction} that implements {@link StoppableFunction}. - * - * @param Type of the output elements - * @param Type of the source function which has to be stoppable - */ -public class StoppableStreamSource & StoppableFunction> - extends StreamSource { - - private static final long serialVersionUID = -4365670858793587337L; - - /** - * Takes a {@link SourceFunction} that implements {@link StoppableFunction}. - * - * @param sourceFunction - * A {@link SourceFunction} that implements {@link StoppableFunction}. - */ - public StoppableStreamSource(SRC sourceFunction) { - super(sourceFunction); - } - - /** - * Marks the source a stopped and calls {@link StoppableFunction#stop()} on the user function. - */ - public void stop() { - // important: marking the source as stopped has to happen before the function is stopped. - // the flag that tracks this status is volatile, so the memory model also guarantees - // the happens-before relationship - markCanceledOrStopped(); - userFunction.stop(); - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java deleted file mode 100644 index 40457917215ce..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; - -/** - * Stoppable task for executing stoppable streaming sources. - * - * @param Type of the produced elements - * @param Stoppable source function - */ -public class StoppableSourceStreamTask & StoppableFunction> - extends SourceStreamTask> implements StoppableTask { - - private volatile boolean stopped; - - public StoppableSourceStreamTask(Environment environment) { - super(environment); - } - - @Override - protected void run() throws Exception { - if (!stopped) { - super.run(); - } - } - - @Override - public void stop() { - stopped = true; - if (this.headOperator != null) { - this.headOperator.stop(); - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java index 495df42a44846..e9bca732fb77a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -29,7 +28,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; @@ -124,58 +122,12 @@ public void run() { assertTrue(output.isEmpty()); } - @Test - public void testNoMaxWatermarkOnImmediateStop() throws Exception { - - final List output = new ArrayList<>(); - - // regular stream source operator - final StoppableStreamSource> operator = - new StoppableStreamSource<>(new InfiniteSource()); - - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0); - operator.stop(); - - // run and stop - operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput(output)); - - assertTrue(output.isEmpty()); - } - - @Test - public void testNoMaxWatermarkOnAsyncStop() throws Exception { - - final List output = new ArrayList<>(); - - // regular stream source operator - final StoppableStreamSource> operator = - new StoppableStreamSource<>(new InfiniteSource()); - - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0); - - // trigger an async cancel in a bit - new Thread("canceler") { - @Override - public void run() { - try { - Thread.sleep(200); - } catch (InterruptedException ignored) {} - operator.stop(); - } - }.start(); - - // run and wait to be stopped - operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput(output)); - - assertTrue(output.isEmpty()); - } - @Test public void testAutomaticWatermarkContext() throws Exception { // regular stream source operator - final StoppableStreamSource> operator = - new StoppableStreamSource<>(new InfiniteSource()); + final StreamSource> operator = + new StreamSource<>(new InfiniteSource<>()); long watermarkInterval = 10; TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); @@ -252,19 +204,16 @@ private static void setupSourceOperator(StreamSource operator, // ------------------------------------------------------------------------ - private static final class FiniteSource implements SourceFunction, StoppableFunction { + private static final class FiniteSource implements SourceFunction { @Override public void run(SourceContext ctx) {} @Override public void cancel() {} - - @Override - public void stop() {} } - private static final class InfiniteSource implements SourceFunction, StoppableFunction { + private static final class InfiniteSource implements SourceFunction { private volatile boolean running = true; @@ -279,10 +228,5 @@ public void run(SourceContext ctx) throws Exception { public void cancel() { running = false; } - - @Override - public void stop() { - running = false; - } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java deleted file mode 100644 index 52d8429ac848e..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.operators.StoppableStreamSource; - -import org.junit.Test; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * These tests verify that the RichFunction methods are called (in correct order). And that - * checkpointing/element emission don't occur concurrently. - */ -public class SourceStreamTaskStoppingTest { - - // test flag for testStop() - static boolean stopped = false; - - @Test - public void testStop() { - final StoppableSourceStreamTask sourceTask = - new StoppableSourceStreamTask<>(new DummyEnvironment("test", 1, 0)); - - sourceTask.headOperator = new StoppableStreamSource<>(new StoppableSource()); - - sourceTask.stop(); - - assertTrue(stopped); - } - - @Test - public void testStopBeforeInitialization() throws Exception { - - final StoppableSourceStreamTask sourceTask = - new StoppableSourceStreamTask<>(new DummyEnvironment("test", 1, 0)); - sourceTask.stop(); - - sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource()); - sourceTask.run(); - } - - // ------------------------------------------------------------------------ - - private static class StoppableSource extends RichSourceFunction implements StoppableFunction { - private static final long serialVersionUID = 728864804042338806L; - - @Override - public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext ctx) - throws Exception { - } - - @Override - public void cancel() {} - - @Override - public void stop() { - stopped = true; - } - } - - private static class StoppableFailingSource extends RichSourceFunction implements StoppableFunction { - private static final long serialVersionUID = 728864804042338806L; - - @Override - public void run(SourceContext ctx) throws Exception { - fail("should not be called"); - } - - @Override - public void cancel() {} - - @Override - public void stop() {} - } -} - diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 4b6869656507f..2665d099bc224 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; @@ -797,7 +796,7 @@ public void run(SourceContext ctx) throws Exception { public void cancel() {} } - private static class MyTimestampSourceInfinite implements SourceFunction, StoppableFunction { + private static class MyTimestampSourceInfinite implements SourceFunction { private final long initialTime; private final int numWatermarks; @@ -825,11 +824,6 @@ public void run(SourceContext ctx) throws Exception { public void cancel() { running = false; } - - @Override - public void stop() { - running = false; - } } private static class MyNonWatermarkingSource implements SourceFunction {