diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
new file mode 100644
index 0000000000000..e67db44b955e4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
+
+import javax.annotation.Nullable;
+
+/**
+ * A listener that is notified on specific job status changed, which should be firstly
+ * registered by {@code #registerJobListener} of execution environments.
+ *
+ *
It is highly recommended NOT to perform any blocking operation inside
+ * the callbacks. If you block the thread the invoker of environment execute methods
+ * is possibly blocked.
+ */
+@PublicEvolving
+public interface JobListener {
+
+ /**
+ * Callback on job submission. This is called when {@code execute()} or {@code executeAsync()}
+ * is called.
+ *
+ *
Exactly one of the passed parameters is null, respectively for failure or success.
+ *
+ * @param jobClient a {@link JobClient} for the submitted Flink job
+ * @param throwable the cause if submission failed
+ */
+ void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable);
+
+ /**
+ * Callback on job execution finished, successfully or unsuccessfully. It is only called
+ * back when you call {@code execute()} instead of {@code executeAsync()} methods of execution
+ * environments.
+ *
+ *
Exactly one of the passed parameters is null, respectively for failure or success.
+ */
+ void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable);
+
+}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index cf82c2267ae50..6078470dd6122 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -58,8 +58,10 @@
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
@@ -136,6 +138,8 @@ public class ExecutionEnvironment {
private final ClassLoader userClassloader;
+ private final List jobListeners = new ArrayList<>();
+
/**
* Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
* configure the {@link org.apache.flink.core.execution.Executor}.
@@ -819,15 +823,44 @@ public JobExecutionResult execute() throws Exception {
public JobExecutionResult execute(String jobName) throws Exception {
final JobClient jobClient = executeAsync(jobName);
- if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
- lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
- } else {
- lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
+ try {
+ if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
+ lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
+ } else {
+ lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
+ }
+
+ jobListeners.forEach(
+ jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));
+
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener -> {
+ jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
+ });
+ ExceptionUtils.rethrowException(t);
}
return lastJobExecutionResult;
}
+ /**
+ * Register a {@link JobListener} in this environment. The {@link JobListener} will be
+ * notified on specific job status changed.
+ */
+ @PublicEvolving
+ public void registerJobListener(JobListener jobListener) {
+ checkNotNull(jobListener, "JobListener cannot be null");
+ jobListeners.add(jobListener);
+ }
+
+ /**
+ * Clear all registered {@link JobListener}s.
+ */
+ @PublicEvolving
+ public void clearJobListeners() {
+ this.jobListeners.clear();
+ }
+
/**
* Triggers the program execution asynchronously. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
@@ -876,7 +909,17 @@ public JobClient executeAsync(String jobName) throws Exception {
.getExecutor(configuration)
.execute(plan, configuration);
- return jobClientFuture.get();
+ try {
+ JobClient jobClient = jobClientFuture.get();
+ jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
+ return jobClient;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
+ ExceptionUtils.rethrow(t);
+
+ // make javac happy, this code path will not be reached
+ return null;
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
index e4dbe295c0d61..02f83fd6c32e8 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
@@ -51,7 +51,7 @@ def excluded_methods(cls):
'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
- 'executeAsync'}
+ 'executeAsync', 'registerJobListener', 'clearJobListeners'}
if __name__ == '__main__':
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 7cfa605439c58..a1a325fa09d62 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -48,7 +48,8 @@ def excluded_methods(cls):
'readFileStream', 'isForceCheckpointing', 'readFile', 'clean',
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
- 'setNumberOfExecutionRetries', 'configure', 'executeAsync'}
+ 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
+ 'clearJobListeners'}
if __name__ == '__main__':
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 9c5598d2bb3b0..79d32d741832b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.execution.JobClient
+import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
@@ -494,6 +494,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.execute(jobName)
}
+ /**
+ * Register a [[JobListener]] in this environment. The [[JobListener]] will be
+ * notified on specific job status changed.
+ */
+ @PublicEvolving
+ def registerJobListener(jobListener: JobListener): Unit = {
+ javaEnv.registerJobListener(jobListener)
+ }
+
+ /**
+ * Clear all registered [[JobListener]]s.
+ */
+ @PublicEvolving def clearJobListeners(): Unit = {
+ javaEnv.clearJobListeners()
+ }
+
/**
* Triggers the program execution asynchronously.
* The environment will execute all parts of the program that have
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 34aab2eb93ef2..dc1053f7f447c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -58,6 +58,7 @@
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -85,6 +86,7 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.StringUtils;
@@ -167,6 +169,8 @@ public class StreamExecutionEnvironment {
private final ClassLoader userClassloader;
+ private final List jobListeners = new ArrayList<>();
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
@@ -1637,13 +1641,45 @@ public JobExecutionResult execute(String jobName) throws Exception {
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
- if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
- return jobClient.getJobExecutionResult(userClassloader).get();
- } else {
- return new DetachedJobExecutionResult(jobClient.getJobID());
+ try {
+ JobExecutionResult jobExecutionResult =
+ configuration.getBoolean(DeploymentOptions.ATTACHED) ?
+ jobClient.getJobExecutionResult(userClassloader).get()
+ : new DetachedJobExecutionResult(jobClient.getJobID());
+
+ jobListeners
+ .forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
+
+ return jobExecutionResult;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener -> {
+ jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
+ });
+ ExceptionUtils.rethrowException(t);
+
+ // never reached, only make javac happy
+ return null;
}
}
+ /**
+ * Register a {@link JobListener} in this environment. The {@link JobListener} will be
+ * notified on specific job status changed.
+ */
+ @PublicEvolving
+ public void registerJobListener(JobListener jobListener) {
+ checkNotNull(jobListener, "JobListener cannot be null");
+ jobListeners.add(jobListener);
+ }
+
+ /**
+ * Clear all registered {@link JobListener}s.
+ */
+ @PublicEvolving
+ public void clearJobListeners() {
+ this.jobListeners.clear();
+ }
+
/**
* Triggers the program asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
@@ -1704,7 +1740,17 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
.getExecutor(configuration)
.execute(streamGraph, configuration);
- return jobClientFuture.get();
+ try {
+ JobClient jobClient = jobClientFuture.get();
+ jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
+ return jobClient;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
+ ExceptionUtils.rethrow(t);
+
+ // make javac happy, this code path will not be reached
+ return null;
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f911f3e353af0..9eb0e3339b04a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -29,13 +29,14 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
-import org.apache.flink.core.execution.JobClient
+import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.runtime.state.AbstractStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
+import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.flink.util.SplittableIterator
import scala.collection.JavaConverters._
@@ -679,6 +680,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def execute(jobName: String) = javaEnv.execute(jobName)
+ /**
+ * Register a [[JobListener]] in this environment. The [[JobListener]] will be
+ * notified on specific job status changed.
+ */
+ @PublicEvolving
+ def registerJobListener(jobListener: JobListener): Unit = {
+ javaEnv.registerJobListener(jobListener)
+ }
+
+ /**
+ * Clear all registered [[JobListener]]s.
+ */
+ @PublicEvolving def clearJobListeners(): Unit = {
+ javaEnv.clearJobListeners()
+ }
+
/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
new file mode 100644
index 0000000000000..ad0173fbd5b58
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit tests for {@link JobListener}.
+ */
+public class JobListenerITCase extends TestLogger {
+
+ @Test
+ public void testJobListenerOnBatchEnvironment() throws Exception {
+ OneShotLatch submissionLatch = new OneShotLatch();
+ OneShotLatch executionLatch = new OneShotLatch();
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ env.registerJobListener(new JobListener() {
+ @Override
+ public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
+ submissionLatch.trigger();
+ }
+
+ @Override
+ public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
+ executionLatch.trigger();
+ }
+ });
+
+ env.fromElements(1, 2, 3, 4, 5).output(new DiscardingOutputFormat<>());
+ env.execute();
+
+ submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
+ executionLatch.await(2000L, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testJobListenerOnStreamingEnvironment() throws Exception {
+ OneShotLatch submissionLatch = new OneShotLatch();
+ OneShotLatch executionLatch = new OneShotLatch();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.registerJobListener(new JobListener() {
+ @Override
+ public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
+ submissionLatch.trigger();
+ }
+
+ @Override
+ public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
+ executionLatch.trigger();
+ }
+ });
+
+ env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>());
+ env.execute();
+
+ submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
+ executionLatch.await(2000L, TimeUnit.MILLISECONDS);
+ }
+}