Skip to content

Commit

Permalink
[FLINK-16661] Move the static job id setting to ApplicationDispatcher…
Browse files Browse the repository at this point in the history
…Bootstrap

HA support and static Job Ids go hand-in-hand, as HA requires that
the id of a job graph stays fixed across consecutive executions. In
addition, no 2 jobs can have the same job id while executing on the
same cluster. This commit consolidates this logic (in the context
of Application Mode) in one place, the ApplicationDispatcherBootstrap.
  • Loading branch information
kl0u committed Apr 30, 2020
1 parent d553c7f commit f42c14e
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.SerializedThrowable;
Expand Down Expand Up @@ -72,6 +74,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap

private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

private final PackagedProgram application;

private final Collection<JobGraph> recoveredJobs;
Expand Down Expand Up @@ -127,7 +131,7 @@ CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor) {

applicationCompletionFuture = runApplicationAsync(dispatcher, scheduledExecutor, false);
applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);

return applicationCompletionFuture
.handle((r, t) -> {
Expand All @@ -144,13 +148,30 @@ CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
.thenCompose(Function.identity());
}

@VisibleForTesting
CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor) {

final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);

if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && !configuredJobId.isPresent()) {
return runApplicationAsync(dispatcher, scheduledExecutor, false);
}

if (!configuredJobId.isPresent()) {
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, ZERO_JOB_ID.toHexString());
}
return runApplicationAsync(dispatcher, scheduledExecutor, true);
}

/**
* Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
* The returned {@link CompletableFuture} completes when all jobs of the user application
* succeeded. if any of them fails, or if job submission fails.
*/
@VisibleForTesting
CompletableFuture<Void> runApplicationAsync(
private CompletableFuture<Void> runApplicationAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution) {
Expand All @@ -159,7 +180,7 @@ CompletableFuture<Void> runApplicationAsync(
// we need to hand in a future as return value because we need to get those JobIs out
// from the scheduled task that executes the user program
applicationExecutionTask = scheduledExecutor.schedule(
() -> runApplicationEntrypoint(
() -> runApplicationEntryPoint(
applicationExecutionFuture,
dispatcher,
scheduledExecutor,
Expand All @@ -177,7 +198,7 @@ CompletableFuture<Void> runApplicationAsync(
*
* <p>This should be executed in a separate thread (or task).
*/
private void runApplicationEntrypoint(
private void runApplicationEntryPoint(
final CompletableFuture<List<JobID>> jobIdsFuture,
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
Expand Down Expand Up @@ -82,22 +85,123 @@ public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()));

final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, 0, false);
runApplication(dispatcherBuilder, 0);

assertException(applicationFuture, ApplicationExecutionException.class);
}

@Test
public void testExceptionThrownWithMultiJobApplicationIfOnlyOneJobIsAllowed() throws Throwable {
final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()));
public void testOnlyOneJobIsAllowedWithHa() throws Throwable {
final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());

final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, 3, true);
runApplication(configurationUnderTest, 2);

assertException(applicationFuture, FlinkRuntimeException.class);
}

@Test
public void testOnlyOneJobAllowedWithStaticJobId() throws Throwable {
final JobID testJobID = new JobID(0, 2);

final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString());

final CompletableFuture<Void> applicationFuture =
runApplication(configurationUnderTest, 2);

assertException(applicationFuture, FlinkRuntimeException.class);
}

@Test
public void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable {
final JobID testJobID = new JobID(0, 2);

final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString());
configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());

final CompletableFuture<Void> applicationFuture =
runApplication(configurationUnderTest, 2);

assertException(applicationFuture, FlinkRuntimeException.class);
}

@Test
public void testJobIdDefaultsToZeroWithHa() throws Throwable {
final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());

final CompletableFuture<JobID> submittedJobId = new CompletableFuture<>();

final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> {
submittedJobId.complete(jobGraph.getJobID());
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, configurationUnderTest, 1);

applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 0L)));
}

@Test
public void testStaticJobId() throws Throwable {
final JobID testJobID = new JobID(0, 2);

final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString());

final CompletableFuture<JobID> submittedJobId = new CompletableFuture<>();

final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> {
submittedJobId.complete(jobGraph.getJobID());
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, configurationUnderTest, 1);

applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 2L)));
}

@Test
public void testStaticJobIdWithHa() throws Throwable {
final JobID testJobID = new JobID(0, 2);

final Configuration configurationUnderTest = getConfiguration();
configurationUnderTest.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, testJobID.toHexString());
configurationUnderTest.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());

final CompletableFuture<JobID> submittedJobId = new CompletableFuture<>();

final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> {
submittedJobId.complete(jobGraph.getJobID());
return CompletableFuture.completedFuture(Acknowledge.get());
})
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

final CompletableFuture<Void> applicationFuture =
runApplication(dispatcherBuilder, configurationUnderTest, 1);

applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);

assertThat(submittedJobId.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(new JobID(0L, 2L)));
}

@Test
public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>();
Expand Down Expand Up @@ -126,7 +230,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
return new CompletableFuture<>();
});

final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2, false);
final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);

assertException(applicationFuture, JobExecutionException.class);
}
Expand All @@ -138,7 +242,7 @@ public void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 3, false);
final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 3);

// this would block indefinitely if the applications don't finish
applicationFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand Down Expand Up @@ -300,19 +404,37 @@ public void testClusterShutdownWhenApplicationFails() throws Exception {

private CompletableFuture<Void> runApplication(
TestingDispatcherGateway.Builder dispatcherBuilder,
int noOfJobs,
boolean enforceSingleJobExecution) throws FlinkException {
int noOfJobs) throws FlinkException {

return runApplication(dispatcherBuilder, getConfiguration(), noOfJobs);
}

private CompletableFuture<Void> runApplication(
final Configuration configuration,
final int noOfJobs) throws Throwable {

final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)));

return runApplication(dispatcherBuilder, configuration, noOfJobs);
}

private CompletableFuture<Void> runApplication(
TestingDispatcherGateway.Builder dispatcherBuilder,
Configuration configuration,
int noOfJobs) throws FlinkException {

final PackagedProgram program = getProgram(noOfJobs);

final ApplicationDispatcherBootstrap bootstrap =
new ApplicationDispatcherBootstrap(
program, Collections.emptyList(), getConfiguration());
program, Collections.emptyList(), configuration);

return bootstrap.runApplicationAsync(
return bootstrap.fixJobIdAndRunApplicationAsync(
dispatcherBuilder.build(),
scheduledExecutor,
enforceSingleJobExecution);
scheduledExecutor);
}

private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
Expand All @@ -45,7 +44,6 @@

import java.io.IOException;
import java.net.URL;
import java.util.Optional;

import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory;

Expand All @@ -56,8 +54,6 @@
@Internal
public final class StandaloneJobClusterEntryPoint extends ApplicationClusterEntryPoint {

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

private StandaloneJobClusterEntryPoint(
final Configuration configuration,
final PackagedProgram program) {
Expand Down Expand Up @@ -127,20 +123,9 @@ private static PackagedProgramRetriever getPackagedProgramRetriever(
}

private static void setStaticJobId(StandaloneJobClusterConfiguration clusterConfiguration, Configuration configuration) {
final JobID jobId = resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration);
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
}

@VisibleForTesting
static JobID resolveJobIdForCluster(Optional<JobID> optionalJobID, Configuration configuration) {
return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration));
}

private static JobID createJobIdForCluster(Configuration globalConfiguration) {
if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) {
return ZERO_JOB_ID;
} else {
return JobID.generate();
final JobID jobId = clusterConfiguration.getJobId();
if (jobId != null) {
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
}
}
}
Loading

0 comments on commit f42c14e

Please sign in to comment.