Skip to content

Commit

Permalink
[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointC…
Browse files Browse the repository at this point in the history
…oordinator

This makes the exception that can occur during serialization of the ExecutionConfig explicit,
and adds some comments to JobGraph.
  • Loading branch information
StephanEwen committed Feb 20, 2017
1 parent 50fd1a3 commit f63426b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -223,7 +224,13 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {

// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName());
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
try {
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
}
catch (IOException e) {
throw new CompilerException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}

graph.setAllowQueuedScheduling(false);
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ else if (LOG.isDebugEnabled()) {
* @return Flag indicating whether the ack'd checkpoint was associated
* with a pending checkpoint.
*
* @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
* @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,23 @@
*
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.</p>
* but inside certain special vertices that establish the feedback channel amongst themselves.
*
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
* define the characteristics of the concrete operation and intermediate data.</p>
* define the characteristics of the concrete operation and intermediate data.
*/
public class JobGraph implements Serializable {

private static final long serialVersionUID = 1L;

// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
// --- job and configuration ---

/** List of task vertices included in this job graph. */
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

/** The job configuration attached to this job. */
private final Configuration jobConfiguration = new Configuration();

/** Set of JAR files required to run this job. */
private final List<Path> userJars = new ArrayList<Path>();

/** Set of blob keys identifying the JAR files required to run this job. */
private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();

/** ID of this job. May be set if specific job id is desired (e.g. session management) */
private final JobID jobID;

Expand All @@ -94,18 +86,28 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

/** The settings for asynchronous snapshots */
private JobSnapshottingSettings snapshotSettings;

/** List of classpaths required to run this job. */
private List<URL> classpaths = Collections.emptyList();
// --- checkpointing ---

/** Job specific execution config */
private SerializedValue<ExecutionConfig> serializedExecutionConfig;

/** The settings for the job checkpoints */
private JobSnapshottingSettings snapshotSettings;

/** Savepoint restore settings. */
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

// --- attached resources ---

/** Set of JAR files required to run this job. */
private final List<Path> userJars = new ArrayList<Path>();

/** Set of blob keys identifying the JAR files required to run this job. */
private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();

/** List of classpaths required to run this job. */
private List<URL> classpaths = Collections.emptyList();

// --------------------------------------------------------------------------------------------

/**
Expand All @@ -129,7 +131,13 @@ public JobGraph(String jobName) {
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
setExecutionConfig(new ExecutionConfig());

try {
setExecutionConfig(new ExecutionConfig());
} catch (IOException e) {
// this should never happen, since an empty execution config is always serializable
throw new RuntimeException("bug, empty execution config is not serializable");
}
}

/**
Expand Down Expand Up @@ -260,17 +268,16 @@ public SavepointRestoreSettings getSavepointRestoreSettings() {
}

/**
* Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
* object will not affect this serialized copy.
* Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
* transport. Further modification of the referenced ExecutionConfig object will not affect
* this serialized copy.
*
* @param executionConfig The ExecutionConfig to be serialized.
* @throws IOException Thrown if the serialization of the ExecutionConfig fails
*/
public void setExecutionConfig(ExecutionConfig executionConfig) {
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
checkNotNull(executionConfig, "ExecutionConfig must not be null.");
try {
this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
} catch (IOException e) {
throw new RuntimeException("Could not serialize ExecutionConfig.", e);
}
this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
}

/**
Expand Down Expand Up @@ -362,6 +369,21 @@ public List<URL> getClasspaths() {
return classpaths;
}

/**
* Gets the maximum parallelism of all operations in this job graph.
*
* @return The maximum parallelism of this job graph
*/
public int getMaximumParallelism() {
int maxParallelism = -1;
for (JobVertex vertex : taskVertices.values()) {
maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
}
return maxParallelism;
}

// --------------------------------------------------------------------------------------------
// Topological Graph Access
// --------------------------------------------------------------------------------------------

public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
Expand Down Expand Up @@ -538,18 +560,6 @@ public void uploadRequiredJarFiles(InetSocketAddress serverAddress,
}
}

/**
* Gets the maximum parallelism of all operations in this job graph.
* @return The maximum parallelism of this job graph
*/
public int getMaximumParallelism() {
int maxParallelism = -1;
for (JobVertex vertex : taskVertices.values()) {
maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
}
return maxParallelism;
}

/**
* Uploads the previously added user JAR files to the job manager through
* the job manager's BLOB server. The respective port is retrieved from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
Expand Down Expand Up @@ -136,11 +135,6 @@ public JobGraph createBlockingJob(int parallelism) {
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);

ExecutionConfig executionConfig = new ExecutionConfig();

JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver);
jobGraph.setExecutionConfig(executionConfig);

return jobGraph;
return new JobGraph("Blocking test job", sender, receiver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import org.junit.Test;

import java.io.IOException;

/**
* Integration test cases for the {@link MiniCluster}.
*/
Expand Down Expand Up @@ -95,7 +97,7 @@ private static void executeJob(MiniCluster miniCluster) throws Exception {
miniCluster.runJobBlocking(job);
}

private static JobGraph getSimpleJob() {
private static JobGraph getSimpleJob() throws IOException {
JobVertex task = new JobVertex("Test task");
task.setParallelism(1);
task.setMaxParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -129,7 +131,13 @@ public JobGraph createJobGraph() {
configureCheckpointing();

// set the ExecutionConfig last when it has been finalized
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}

return jobGraph;
}
Expand Down

0 comments on commit f63426b

Please sign in to comment.