Skip to content

Commit

Permalink
[FLINK-14992][client] Add job listener to execution environments
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and aljoscha committed Dec 10, 2019
1 parent 166e10f commit 06a78c6
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.execution;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;

import javax.annotation.Nullable;

/**
* A listener that is notified on specific job status changed, which should be firstly
* registered by {@code #registerJobListener} of execution environments.
*
* <p>It is highly recommended NOT to perform any blocking operation inside
* the callbacks. If you block the thread the invoker of environment execute methods
* is possibly blocked.
*/
@PublicEvolving
public interface JobListener {

/**
* Callback on job submission. This is called when {@code execute()} or {@code executeAsync()}
* is called.
*
* <p>Exactly one of the passed parameters is null, respectively for failure or success.
*
* @param jobClient a {@link JobClient} for the submitted Flink job
* @param throwable the cause if submission failed
*/
void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable);

/**
* Callback on job execution finished, successfully or unsuccessfully. It is only called
* back when you call {@code execute()} instead of {@code executeAsync()} methods of execution
* environments.
*
* <p>Exactly one of the passed parameters is null, respectively for failure or success.
*/
void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable);

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
Expand Down Expand Up @@ -136,6 +138,8 @@ public class ExecutionEnvironment {

private final ClassLoader userClassloader;

private final List<JobListener> jobListeners = new ArrayList<>();

/**
* Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
* configure the {@link org.apache.flink.core.execution.Executor}.
Expand Down Expand Up @@ -819,15 +823,44 @@ public JobExecutionResult execute() throws Exception {
public JobExecutionResult execute(String jobName) throws Exception {
final JobClient jobClient = executeAsync(jobName);

if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
} else {
lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
try {
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
} else {
lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}

jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));

} catch (Throwable t) {
jobListeners.forEach(jobListener -> {
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
});
ExceptionUtils.rethrowException(t);
}

return lastJobExecutionResult;
}

/**
* Register a {@link JobListener} in this environment. The {@link JobListener} will be
* notified on specific job status changed.
*/
@PublicEvolving
public void registerJobListener(JobListener jobListener) {
checkNotNull(jobListener, "JobListener cannot be null");
jobListeners.add(jobListener);
}

/**
* Clear all registered {@link JobListener}s.
*/
@PublicEvolving
public void clearJobListeners() {
this.jobListeners.clear();
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
Expand Down Expand Up @@ -876,7 +909,17 @@ public JobClient executeAsync(String jobName) throws Exception {
.getExecutor(configuration)
.execute(plan, configuration);

return jobClientFuture.get();
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (Throwable t) {
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
ExceptionUtils.rethrow(t);

// make javac happy, this code path will not be reached
return null;
}
}

private void consolidateParallelismDefinitionsInConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def excluded_methods(cls):
'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
'executeAsync'}
'executeAsync', 'registerJobListener', 'clearJobListeners'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def excluded_methods(cls):
'readFileStream', 'isForceCheckpointing', 'readFile', 'clean',
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync'}
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners'}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.execution.JobClient
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
Expand Down Expand Up @@ -494,6 +494,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.execute(jobName)
}

/**
* Register a [[JobListener]] in this environment. The [[JobListener]] will be
* notified on specific job status changed.
*/
@PublicEvolving
def registerJobListener(jobListener: JobListener): Unit = {
javaEnv.registerJobListener(jobListener)
}

/**
* Clear all registered [[JobListener]]s.
*/
@PublicEvolving def clearJobListeners(): Unit = {
javaEnv.clearJobListeners()
}

/**
* Triggers the program execution asynchronously.
* The environment will execute all parts of the program that have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -167,6 +169,8 @@ public class StreamExecutionEnvironment {

private final ClassLoader userClassloader;

private final List<JobListener> jobListeners = new ArrayList<>();

// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1637,13 +1641,45 @@ public JobExecutionResult execute(String jobName) throws Exception {
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);

if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
return jobClient.getJobExecutionResult(userClassloader).get();
} else {
return new DetachedJobExecutionResult(jobClient.getJobID());
try {
JobExecutionResult jobExecutionResult =
configuration.getBoolean(DeploymentOptions.ATTACHED) ?
jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());

jobListeners
.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

return jobExecutionResult;
} catch (Throwable t) {
jobListeners.forEach(jobListener -> {
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
});
ExceptionUtils.rethrowException(t);

// never reached, only make javac happy
return null;
}
}

/**
* Register a {@link JobListener} in this environment. The {@link JobListener} will be
* notified on specific job status changed.
*/
@PublicEvolving
public void registerJobListener(JobListener jobListener) {
checkNotNull(jobListener, "JobListener cannot be null");
jobListeners.add(jobListener);
}

/**
* Clear all registered {@link JobListener}s.
*/
@PublicEvolving
public void clearJobListeners() {
this.jobListeners.clear();
}

/**
* Triggers the program asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
Expand Down Expand Up @@ -1704,7 +1740,17 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
.getExecutor(configuration)
.execute(streamGraph, configuration);

return jobClientFuture.get();
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (Throwable t) {
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
ExceptionUtils.rethrow(t);

// make javac happy, this code path will not be reached
return null;
}
}

private void consolidateParallelismDefinitionsInConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.JobClient
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.runtime.state.AbstractStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.flink.util.SplittableIterator

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -679,6 +680,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def execute(jobName: String) = javaEnv.execute(jobName)

/**
* Register a [[JobListener]] in this environment. The [[JobListener]] will be
* notified on specific job status changed.
*/
@PublicEvolving
def registerJobListener(jobListener: JobListener): Unit = {
javaEnv.registerJobListener(jobListener)
}

/**
* Clear all registered [[JobListener]]s.
*/
@PublicEvolving def clearJobListeners(): Unit = {
javaEnv.clearJobListeners()
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
Expand Down
Loading

0 comments on commit 06a78c6

Please sign in to comment.