Skip to content

Commit

Permalink
[FLINK-16657] Allow the (Stream)ContenxtEnv to enforce single job exe…
Browse files Browse the repository at this point in the history
…cution
  • Loading branch information
kl0u committed Apr 6, 2020
1 parent 7381304 commit ab123f2
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public static JobExecutionResult submitJobAndWaitForResult(
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program) throws ProgramInvocationException {
PackagedProgram program,
boolean enforceSingleJobExecution) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Expand All @@ -131,12 +132,14 @@ public static void executeProgram(
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader);
userCodeClassLoader,
enforceSingleJobExecution);

StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader);
userCodeClassLoader,
enforceSingleJobExecution);

try {
program.invokeInteractiveModeForExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPa
// --------------------------------------------------------------------------------------------

protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public class DetachedApplicationRunner implements ApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(DetachedApplicationRunner.class);

private final boolean enforceSingleJobExecution;

public DetachedApplicationRunner(final boolean enforceSingleJobExecution) {
this.enforceSingleJobExecution = enforceSingleJobExecution;
}

@Override
public List<JobID> run(final DispatcherGateway dispatcherGateway, final PackagedProgram program, final Configuration configuration) {
checkNotNull(dispatcherGateway);
Expand All @@ -69,7 +75,7 @@ private List<JobID> tryExecuteJobs(final DispatcherGateway dispatcherGateway, fi
new EmbeddedExecutorServiceLoader(applicationJobIds, dispatcherGateway);

try {
ClientUtils.executeProgram(executorServiceLoader, configuration, program);
ClientUtils.executeProgram(executorServiceLoader, configuration, program, enforceSingleJobExecution);
} catch (ProgramInvocationException e) {
LOG.warn("Could not execute application: ", e);
throw new FlinkRuntimeException("Could not execute application.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand All @@ -42,11 +43,19 @@ public class ContextEnvironment extends ExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

private final boolean enforceSingleJobExecution;

private int jobCounter;

public ContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.enforceSingleJobExecution = enforceSingleJobExecution;

this.jobCounter = 0;
}

@Override
Expand Down Expand Up @@ -82,13 +91,22 @@ public JobExecutionResult execute(String jobName) throws Exception {

@Override
public JobClient executeAsync(String jobName) throws Exception {
validateAllowedExecution();

final JobClient jobClient = super.executeAsync(jobName);

System.out.println("Job has been submitted with JobID " + jobClient.getJobID());

return jobClient;
}

private void validateAllowedExecution() {
if (enforceSingleJobExecution && jobCounter > 0) {
throw new FlinkRuntimeException("Cannot have more than one execute() or executeAsync() call in a single environment.");
}
jobCounter++;
}

@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
Expand All @@ -99,11 +117,13 @@ public String toString() {
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
ExecutionEnvironmentFactory factory = () -> new ContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader);
userCodeClassLoader,
enforceSingleJobExecution);
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand All @@ -46,11 +47,19 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

private final boolean enforceSingleJobExecution;

private int jobCounter;

public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.enforceSingleJobExecution = enforceSingleJobExecution;

this.jobCounter = 0;
}

@Override
Expand Down Expand Up @@ -87,23 +96,34 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {

@Override
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
validateAllowedExecution();

final JobClient jobClient = super.executeAsync(streamGraph);

System.out.println("Job has been submitted with JobID " + jobClient.getJobID());

return jobClient;
}

private void validateAllowedExecution() {
if (enforceSingleJobExecution && jobCounter > 0) {
throw new FlinkRuntimeException("Cannot have more than one execute() or executeAsync() call in a single environment.");
}
jobCounter++;
}

// --------------------------------------------------------------------------------------------

public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader);
userCodeClassLoader,
enforceSingleJobExecution);
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -133,7 +134,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -145,7 +146,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -157,7 +158,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -169,7 +170,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -178,6 +179,37 @@ public void testDetachedMode() throws Exception{
}
}

@Test(expected = FlinkRuntimeException.class)
public void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {
try {
launchMultiExecuteJob(true);
} catch (Exception e) {
if (e instanceof ProgramInvocationException) {
throw e.getCause();
}
}
fail("Test should have failed due to multiple execute() calls.");
}

@Test
public void testMultiExecuteWithoutEnforcingSingleJobExecution() throws ProgramInvocationException {
launchMultiExecuteJob(false);
}

private void launchMultiExecuteJob(final boolean enforceSingleJobExecution) throws ProgramInvocationException {
try (final ClusterClient<?> clusterClient =
new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster())) {

final PackagedProgram prg = PackagedProgram.newBuilder()
.setEntryPointClassName(TestMultiExecute.class.getName())
.build();

final Configuration configuration = fromPackagedProgram(prg, 1, false);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, enforceSingleJobExecution);
}
}

/**
* This test verifies correct job submission messaging logic and plan translation calls.
*/
Expand Down Expand Up @@ -218,7 +250,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
try {
final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock);
ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock, false);
fail("Creating the local execution environment should not be possible");
}
catch (InvalidProgramException e) {
Expand Down Expand Up @@ -295,6 +327,21 @@ public static void main(String[] args) throws Exception {
}
}

/**
* Test job with multiple execute() calls.
*/
public static final class TestMultiExecute {

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

for (int i = 0; i < 2; i++) {
env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
env.execute();
}
}
}

/**
* Test job that retrieves the net runtime from the {@link JobExecutionResult}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public WebSubmissionExtension(
jarDir,
configuration,
executor,
DetachedApplicationRunner::new);
() -> new DetachedApplicationRunner(true));

final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler(
leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class JarHandlers {
jarDir,
new Configuration(),
executor,
DetachedApplicationRunner::new);
() -> new DetachedApplicationRunner(true));

deleteHandler = new JarDeleteHandler(
gatewayRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public static void setup() throws Exception {

private static class ConfigurationVerifyingDetachedApplicationRunner extends DetachedApplicationRunner {

public ConfigurationVerifyingDetachedApplicationRunner() {
super(true);
}

@Override
public List<JobID> run(DispatcherGateway dispatcherGateway, PackagedProgram program, Configuration configuration) {
assertFalse(configuration.get(DeploymentOptions.ATTACHED));
Expand Down

0 comments on commit ab123f2

Please sign in to comment.