Skip to content

Commit

Permalink
[FLINK-10291] Generate JobGraph with fixed/configurable JobID in Stan…
Browse files Browse the repository at this point in the history
…daloneJobClusterEntrypoint

This closes apache#6733.
  • Loading branch information
yanghua authored and tillrohrmann committed Oct 1, 2018
1 parent d1eb996 commit c839b1a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client.program;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand All @@ -39,18 +40,21 @@
public class PackagedProgramUtils {

/**
* Creates a {@link JobGraph} from the given {@link PackagedProgram}.
* Creates a {@link JobGraph} with a specified {@link JobID}
* from the given {@link PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism) throws ProgramInvocationException {
int defaultParallelism,
JobID jobID) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
final FlinkPlan flinkPlan;
Expand Down Expand Up @@ -79,11 +83,11 @@ public static JobGraph createJobGraph(
final JobGraph jobGraph;

if (flinkPlan instanceof StreamingPlan) {
jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID);
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
} else {
final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
}

for (URL url : packagedProgram.getAllLibraries()) {
Expand All @@ -99,5 +103,22 @@ public static JobGraph createJobGraph(
return jobGraph;
}

/**
* Creates a {@link JobGraph} with a random {@link JobID}
* from the given {@link PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism) throws ProgramInvocationException {
return createJobGraph(packagedProgram, configuration, defaultParallelism, null);
}

private PackagedProgramUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
Expand Down Expand Up @@ -45,6 +46,8 @@ public class ClassPathJobGraphRetriever implements JobGraphRetriever {
@Nonnull
private final String[] programArguments;

public static final JobID FIXED_JOB_ID = new JobID(0, 0);

public ClassPathJobGraphRetriever(
@Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
Expand All @@ -59,7 +62,11 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
final PackagedProgram packagedProgram = createPackagedProgram();
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
try {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(
packagedProgram,
configuration,
defaultParallelism,
FIXED_JOB_ID);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/**
Expand All @@ -53,6 +54,7 @@ public void testJobGraphRetrieval() throws FlinkException {

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID);
}

@Test
Expand All @@ -68,5 +70,6 @@ public void testSavepointRestoreSettings() throws FlinkException {
final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
assertEquals(jobGraph.getJobID(), ClassPathJobGraphRetriever.FIXED_JOB_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,29 @@
import java.io.File;
import java.io.IOException;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;

import javax.annotation.Nullable;

/**
* Abstract class representing Flink Streaming plans
*
*/
public abstract class StreamingPlan implements FlinkPlan {

public abstract JobGraph getJobGraph();
/**
* Gets the assembled {@link JobGraph} with a random {@link JobID}.
*/
@SuppressWarnings("deprecation")
public JobGraph getJobGraph() {
return getJobGraph(null);
}

/**
* Gets the assembled {@link JobGraph} with a specified {@link JobID}.
*/
public abstract JobGraph getJobGraph(@Nullable JobID jobID);

public abstract String getStreamingPlanAsJSON();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -652,19 +653,20 @@ private void removeVertex(StreamNode toRemove) {
}

/**
* Gets the assembled {@link JobGraph}.
* Gets the assembled {@link JobGraph} with a given job id.
*/
@SuppressWarnings("deprecation")
public JobGraph getJobGraph() {
@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
throw new UnsupportedOperationException(
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}

return StreamingJobGraphGenerator.createJobGraph(this);
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -83,7 +86,11 @@ public class StreamingJobGraphGenerator {
// ------------------------------------------------------------------------

public static JobGraph createJobGraph(StreamGraph streamGraph) {
return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
return createJobGraph(streamGraph, null);
}

public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}

// ------------------------------------------------------------------------
Expand All @@ -108,6 +115,10 @@ public static JobGraph createJobGraph(StreamGraph streamGraph) {
private final List<StreamGraphHasher> legacyStreamGraphHashers;

private StreamingJobGraphGenerator(StreamGraph streamGraph) {
this(streamGraph, null);
}

private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
Expand All @@ -121,7 +132,7 @@ private StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.chainedPreferredResources = new HashMap<>();
this.physicalEdgesInOrder = new ArrayList<>();

jobGraph = new JobGraph(streamGraph.getJobName());
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}

private JobGraph createJobGraph() {
Expand Down

0 comments on commit c839b1a

Please sign in to comment.