Skip to content

Commit

Permalink
[FLINK-16657] Forbid blocking calls in JobClient when in Web Submission
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Apr 6, 2020
1 parent ab123f2 commit e6cea4f
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution) throws ProgramInvocationException {
boolean enforceSingleJobExecution,
boolean forbidBlockingJobClient) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Expand All @@ -133,13 +134,15 @@ public static void executeProgram(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution);
enforceSingleJobExecution,
forbidBlockingJobClient);

StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution);
enforceSingleJobExecution,
forbidBlockingJobClient);

try {
program.invokeInteractiveModeForExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPa
// --------------------------------------------------------------------------------------------

protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false);
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link JobClient} that wraps any other job client and transforms it into one that is not allowed
* to wait for the job result.
*
* <p>This is used in web submission, where we do not want the Web UI to have jobs blocking threads while
* waiting for their completion.
*/
@Internal
public class DetachedOnlyJobClientAdapter implements JobClient {

private final JobClient jobClient;

public DetachedOnlyJobClientAdapter(final JobClient jobClient) {
this.jobClient = checkNotNull(jobClient);
}

@Override
public JobID getJobID() {
return jobClient.getJobID();
}

@Override
public CompletableFuture<JobStatus> getJobStatus() {
throw new FlinkRuntimeException("The Job Status cannot be requested when in Web Submission.");
}

@Override
public CompletableFuture<Void> cancel() {
throw new FlinkRuntimeException("Cancelling the job is not supported by the Job Client when in Web Submission.");
}

@Override
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
throw new FlinkRuntimeException("Stop with Savepoint is not supported by the Job Client when in Web Submission.");
}

@Override
public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) {
throw new FlinkRuntimeException("A savepoint cannot be taken through the Job Client when in Web Submission.");
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
throw new FlinkRuntimeException("The Accumulators cannot be fetched through the Job Client when in Web Submission.");
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) {
throw new FlinkRuntimeException("The Job Result cannot be fetched through the Job Client when in Web Submission.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,15 @@ public class DetachedApplicationRunner implements ApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(DetachedApplicationRunner.class);

private final boolean forbidBlockingJobClient;

private final boolean enforceSingleJobExecution;

public DetachedApplicationRunner(final boolean enforceSingleJobExecution) {
public DetachedApplicationRunner(
final boolean enforceSingleJobExecution,
final boolean forbidBlockingJobClient) {
this.enforceSingleJobExecution = enforceSingleJobExecution;
this.forbidBlockingJobClient = forbidBlockingJobClient;
}

@Override
Expand All @@ -75,7 +80,7 @@ private List<JobID> tryExecuteJobs(final DispatcherGateway dispatcherGateway, fi
new EmbeddedExecutorServiceLoader(applicationJobIds, dispatcherGateway);

try {
ClientUtils.executeProgram(executorServiceLoader, configuration, program, enforceSingleJobExecution);
ClientUtils.executeProgram(executorServiceLoader, configuration, program, enforceSingleJobExecution, forbidBlockingJobClient);
} catch (ProgramInvocationException e) {
LOG.warn("Could not execute application: ", e);
throw new FlinkRuntimeException("Could not execute application.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.client.deployment.DetachedOnlyJobClientAdapter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
Expand All @@ -43,6 +44,8 @@ public class ContextEnvironment extends ExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

private final boolean forbidBlockingJobClient;

private final boolean enforceSingleJobExecution;

private int jobCounter;
Expand All @@ -51,8 +54,10 @@ public ContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
final boolean enforceSingleJobExecution,
final boolean forbidBlockingJobClient) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.forbidBlockingJobClient = forbidBlockingJobClient;
this.enforceSingleJobExecution = enforceSingleJobExecution;

this.jobCounter = 0;
Expand Down Expand Up @@ -97,7 +102,9 @@ public JobClient executeAsync(String jobName) throws Exception {

System.out.println("Job has been submitted with JobID " + jobClient.getJobID());

return jobClient;
return forbidBlockingJobClient
? new DetachedOnlyJobClientAdapter(jobClient)
: jobClient;
}

private void validateAllowedExecution() {
Expand All @@ -118,12 +125,14 @@ public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
final boolean enforceSingleJobExecution,
final boolean disallowBlockingJobClient) {
ExecutionEnvironmentFactory factory = () -> new ContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution);
enforceSingleJobExecution,
disallowBlockingJobClient);
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.deployment.DetachedOnlyJobClientAdapter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

private final boolean forbidBlockingJobClient;

private final boolean enforceSingleJobExecution;

private int jobCounter;
Expand All @@ -55,8 +58,10 @@ public StreamContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
final boolean enforceSingleJobExecution,
final boolean forbidBlockingJobClient) {
super(executorServiceLoader, configuration, userCodeClassLoader);
this.forbidBlockingJobClient = forbidBlockingJobClient;
this.enforceSingleJobExecution = enforceSingleJobExecution;

this.jobCounter = 0;
Expand Down Expand Up @@ -102,7 +107,9 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {

System.out.println("Job has been submitted with JobID " + jobClient.getJobID());

return jobClient;
return forbidBlockingJobClient
? new DetachedOnlyJobClientAdapter(jobClient)
: jobClient;
}

private void validateAllowedExecution() {
Expand All @@ -118,12 +125,14 @@ public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution) {
final boolean enforceSingleJobExecution,
final boolean disallowBlockingJobClient) {
StreamExecutionEnvironmentFactory factory = () -> new StreamContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution);
enforceSingleJobExecution,
disallowBlockingJobClient);
initializeContextEnvironment(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -146,7 +147,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -158,7 +159,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -170,7 +171,7 @@ public void testDetachedMode() throws Exception{
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
final Configuration configuration = fromPackagedProgram(prg, 1, true);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false);
ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, false, false);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
assertEquals(
Expand All @@ -182,7 +183,7 @@ public void testDetachedMode() throws Exception{
@Test(expected = FlinkRuntimeException.class)
public void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {
try {
launchMultiExecuteJob(true);
launchMultiExecuteJob(true, false);
} catch (Exception e) {
if (e instanceof ProgramInvocationException) {
throw e.getCause();
Expand All @@ -193,20 +194,37 @@ public void testMultiExecuteWithEnforcingSingleJobExecution() throws Throwable {

@Test
public void testMultiExecuteWithoutEnforcingSingleJobExecution() throws ProgramInvocationException {
launchMultiExecuteJob(false);
launchMultiExecuteJob(false, false);
}

private void launchMultiExecuteJob(final boolean enforceSingleJobExecution) throws ProgramInvocationException {
@Test(expected = FlinkRuntimeException.class)
public void testMultiExecuteWithDisallowingToWaitForResult() throws Throwable {
try {
launchMultiExecuteJob(false, true);
} catch (Exception e) {
if (e instanceof ProgramInvocationException) {
throw e.getCause();
}
}
fail("Test should have failed due to trying to fetch the job result via the JobClient.");
}

private void launchMultiExecuteJob(final boolean enforceSingleJobExecution, final boolean forbidBlockingJobClient) throws ProgramInvocationException {
try (final ClusterClient<?> clusterClient =
new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster())) {

final PackagedProgram prg = PackagedProgram.newBuilder()
final PackagedProgram program = PackagedProgram.newBuilder()
.setEntryPointClassName(TestMultiExecute.class.getName())
.build();

final Configuration configuration = fromPackagedProgram(prg, 1, false);
final Configuration configuration = fromPackagedProgram(program, 1, false);

ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg, enforceSingleJobExecution);
ClientUtils.executeProgram(
new TestExecutorServiceLoader(clusterClient, plan),
configuration,
program,
enforceSingleJobExecution,
forbidBlockingJobClient);
}
}

Expand Down Expand Up @@ -250,7 +268,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
try {
final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock, false);
ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock, false, false);
fail("Creating the local execution environment should not be possible");
}
catch (InvalidProgramException e) {
Expand Down Expand Up @@ -337,7 +355,9 @@ public static void main(String[] args) throws Exception {

for (int i = 0; i < 2; i++) {
env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
env.execute();
JobClient jc = env.executeAsync();

jc.getJobExecutionResult(TestMultiExecute.class.getClassLoader());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public WebSubmissionExtension(
jarDir,
configuration,
executor,
() -> new DetachedApplicationRunner(true));
() -> new DetachedApplicationRunner(true, true));

final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler(
leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class JarHandlers {
jarDir,
new Configuration(),
executor,
() -> new DetachedApplicationRunner(true));
() -> new DetachedApplicationRunner(true, true));

deleteHandler = new JarDeleteHandler(
gatewayRetriever,
Expand Down
Loading

0 comments on commit e6cea4f

Please sign in to comment.