Skip to content

Commit

Permalink
[FLINK-11545] [container] Pass job ID to ClassPathJobGraphRetriever
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Feb 15, 2019
1 parent 6f9bf06 commit 45be671
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
Expand All @@ -30,28 +31,35 @@

import javax.annotation.Nonnull;

import static java.util.Objects.requireNonNull;

/**
* {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
* on the class path.
*/
public class ClassPathJobGraphRetriever implements JobGraphRetriever {
class ClassPathJobGraphRetriever implements JobGraphRetriever {

@Nonnull
private final String jobClassName;

@Nonnull
private final JobID jobId;

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
private final String[] programArguments;

public ClassPathJobGraphRetriever(
ClassPathJobGraphRetriever(
@Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
this.programArguments = programArguments;
this.jobClassName = requireNonNull(jobClassName, "jobClassName");
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
}

@Override
Expand All @@ -63,7 +71,7 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
packagedProgram,
configuration,
defaultParallelism,
StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID);
jobId);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
Expand All @@ -44,19 +45,24 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
@Nonnull
private final String jobClassName;

@Nonnull
private final JobID jobId;

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
private final String[] programArguments;

StandaloneJobClusterEntryPoint(
private StandaloneJobClusterEntryPoint(
Configuration configuration,
@Nonnull String jobClassName,
@Nonnull JobID jobId,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
super(configuration);
this.jobClassName = requireNonNull(jobClassName, "jobClassName");
this.jobId = requireNonNull(jobId, "jobId");
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.programArguments = requireNonNull(programArguments, "programArguments");
}
Expand All @@ -65,7 +71,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
new ClassPathJobGraphRetriever(jobClassName, jobId, savepointRestoreSettings, programArguments));
}

public static void main(String[] args) {
Expand All @@ -92,6 +98,7 @@ public static void main(String[] args) {
StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
configuration,
clusterConfiguration.getJobClassName(),
clusterConfiguration.getJobId(),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -37,39 +38,43 @@
*/
public class ClassPathJobGraphRetrieverTest extends TestLogger {

public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};

@Test
public void testJobGraphRetrieval() throws FlinkException {
final int parallelism = 42;
final Configuration configuration = new Configuration();
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
final JobID jobId = new JobID();

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
jobId,
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
assertEquals(jobGraph.getJobID(), StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID);
assertEquals(jobGraph.getJobID(), jobId);
}

@Test
public void testSavepointRestoreSettings() throws FlinkException {
final Configuration configuration = new Configuration();
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
final JobID jobId = new JobID();

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
jobId,
savepointRestoreSettings,
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
assertEquals(jobGraph.getJobID(), StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID);
assertEquals(jobGraph.getJobID(), jobId);
}
}

0 comments on commit 45be671

Please sign in to comment.