Skip to content

Commit

Permalink
[FLINK-21401] Add JobGraphBuilder and adapt call sites of JobGraph co…
Browse files Browse the repository at this point in the history
…nstructor
  • Loading branch information
tillrohrmann committed Mar 5, 2021
1 parent 77b327c commit e000723
Show file tree
Hide file tree
Showing 32 changed files with 435 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
Expand Down Expand Up @@ -88,8 +89,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);

JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
jobGraph.setSnapshotSettings(
final JobCheckpointingSettings jobCheckpointingSettings =
new JobCheckpointingSettings(
new CheckpointCoordinatorConfiguration(
500,
Expand All @@ -101,7 +101,14 @@ public void testJobManagerJMXMetricAccess() throws Exception {
false,
false,
0),
null));
null);

final JobGraph jobGraph =
JobGraphBuilder.newStreamingJobGraphBuilder()
.setJobName("TestingJob")
.addJobVertex(sourceJobVertex)
.setJobCheckpointingSettings(jobCheckpointingSettings)
.build();

ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
client.submitJob(jobGraph).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.io.BlockingShuffleOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -66,8 +65,8 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
Expand Down Expand Up @@ -245,31 +244,34 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {

// ----------- finalize the job graph -----------

for (JobVertex vertex : this.auxVertices) {
vertex.setSlotSharingGroup(sharingGroup);
}

final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
JobGraphUtils.prepareUserArtifactEntries(
program.getOriginalPlan().getCachedFiles().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
jobId);

// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName());
graph.setJobType(JobType.BATCH);
final JobGraph graph;
try {
graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
graph =
JobGraphBuilder.newBatchJobGraphBuilder()
.setJobId(jobId)
.setJobName(program.getJobName())
.setExecutionConfig(program.getOriginalPlan().getExecutionConfig())
.addJobVertices(vertices.values())
.addJobVertices(auxVertices)
.addUserArtifacts(userArtifacts)
.build();
} catch (IOException e) {
throw new CompilerException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
}

// add vertices to the graph
this.vertices.values().forEach(graph::addVertex);

for (JobVertex vertex : this.auxVertices) {
graph.addVertex(vertex);
vertex.setSlotSharingGroup(sharingGroup);
}

Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts =
program.getOriginalPlan().getCachedFiles().stream()
.map(entry -> Tuple2.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
JobGraphUtils.addUserArtifactEntries(userArtifacts, graph);

// release all references again
this.vertices = null;
this.chainedTasks = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.optimizer.plantranslate;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
Expand All @@ -40,7 +41,6 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand All @@ -56,8 +56,7 @@
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -248,36 +247,27 @@ public void testArtifactCompression() throws IOException {
Path directory2 = tmp.newFolder("directory2").toPath();
Files.createDirectory(directory2.resolve("containedFile2"));

JobGraph jb = JobGraphTestUtils.emptyJobGraph();

final String executableFileName = "executableFile";
final String nonExecutableFileName = "nonExecutableFile";
final String executableDirName = "executableDir";
final String nonExecutableDirName = "nonExecutableDIr";

Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> originalArtifacts =
Arrays.asList(
Tuple2.of(
executableFileName,
new DistributedCache.DistributedCacheEntry(
plainFile1.toString(), true)),
Tuple2.of(
nonExecutableFileName,
new DistributedCache.DistributedCacheEntry(
plainFile2.toString(), false)),
Tuple2.of(
executableDirName,
new DistributedCache.DistributedCacheEntry(
directory1.toString(), true)),
Tuple2.of(
nonExecutableDirName,
new DistributedCache.DistributedCacheEntry(
directory2.toString(), false)));

JobGraphUtils.addUserArtifactEntries(originalArtifacts, jb);

Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts =
jb.getUserArtifacts();
Map<String, DistributedCache.DistributedCacheEntry> originalArtifacts = new HashMap<>();
originalArtifacts.put(
executableFileName,
new DistributedCache.DistributedCacheEntry(plainFile1.toString(), true));
originalArtifacts.put(
nonExecutableFileName,
new DistributedCache.DistributedCacheEntry(plainFile2.toString(), false));
originalArtifacts.put(
executableDirName,
new DistributedCache.DistributedCacheEntry(directory1.toString(), true));
originalArtifacts.put(
nonExecutableDirName,
new DistributedCache.DistributedCacheEntry(directory2.toString(), false));

final Map<String, DistributedCache.DistributedCacheEntry> submittedArtifacts =
JobGraphUtils.prepareUserArtifactEntries(originalArtifacts, new JobID());

DistributedCache.DistributedCacheEntry executableFileEntry =
submittedArtifacts.get(executableFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
Expand Down Expand Up @@ -278,7 +279,11 @@ public void testCancel() throws Exception {
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender);
final JobGraph jobGraph =
JobGraphBuilder.newStreamingJobGraphBuilder()
.setJobName("Stoppable streaming test job")
.addJobVertex(sender)
.build();
final JobID jid = jobGraph.getJobID();

ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,14 @@ public SavepointRestoreSettings getSavepointRestoreSettings() {
*/
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
checkNotNull(executionConfig, "ExecutionConfig must not be null.");
this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
setSerializedExecutionConfig(new SerializedValue<>(executionConfig));
}

void setSerializedExecutionConfig(SerializedValue<ExecutionConfig> serializedExecutionConfig) {
this.serializedExecutionConfig =
checkNotNull(
serializedExecutionConfig,
"The serialized ExecutionConfig must not be null.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobgraph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Builder for the {@link JobGraph}. */
public class JobGraphBuilder {

private final JobType jobType;

private final List<JobVertex> jobVertices = new ArrayList<>();

private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
new HashMap<>();

private final List<URL> classpaths = new ArrayList<>();

private String jobName = "Unnamed job";

@Nullable private JobID jobId = null;

@Nullable private SerializedValue<ExecutionConfig> serializedExecutionConfig = null;

@Nullable private JobCheckpointingSettings jobCheckpointingSettings = null;

@Nullable private SavepointRestoreSettings savepointRestoreSettings = null;

private JobGraphBuilder(JobType jobType) {
this.jobType = jobType;
}

public JobGraphBuilder setJobName(String jobName) {
this.jobName = jobName;
return this;
}

public JobGraphBuilder addJobVertices(Collection<? extends JobVertex> jobVerticesToAdd) {
jobVertices.addAll(jobVerticesToAdd);
return this;
}

public JobGraphBuilder addJobVertex(JobVertex jobVertex) {
return addJobVertices(Collections.singleton(jobVertex));
}

public JobGraphBuilder setJobId(JobID jobId) {
this.jobId = jobId;
return this;
}

public JobGraphBuilder setExecutionConfig(ExecutionConfig newExecutionConfig)
throws IOException {
this.serializedExecutionConfig = new SerializedValue<ExecutionConfig>(newExecutionConfig);
return this;
}

public JobGraphBuilder addUserArtifacts(
Map<String, DistributedCache.DistributedCacheEntry> newUserArtifacts) {
userArtifacts.putAll(newUserArtifacts);
return this;
}

public JobGraphBuilder setJobCheckpointingSettings(
JobCheckpointingSettings newJobCheckpointingSettings) {
this.jobCheckpointingSettings = newJobCheckpointingSettings;
return this;
}

public JobGraphBuilder setSavepointRestoreSettings(
SavepointRestoreSettings newSavepointRestoreSettings) {
savepointRestoreSettings = newSavepointRestoreSettings;
return this;
}

public JobGraphBuilder addClasspaths(Collection<URL> additionalClasspaths) {
classpaths.addAll(additionalClasspaths);
return this;
}

public JobGraph build() {
final JobGraph jobGraph =
new JobGraph(jobId, jobName, jobVertices.toArray(new JobVertex[0]));

jobGraph.setJobType(jobType);

if (serializedExecutionConfig != null) {
jobGraph.setSerializedExecutionConfig(serializedExecutionConfig);
}

for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
userArtifacts.entrySet()) {
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
}

if (jobCheckpointingSettings != null) {
jobGraph.setSnapshotSettings(jobCheckpointingSettings);
}

if (savepointRestoreSettings != null) {
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
}

if (!classpaths.isEmpty()) {
jobGraph.setClasspaths(classpaths);
}

return jobGraph;
}

public static JobGraphBuilder newStreamingJobGraphBuilder() {
return new JobGraphBuilder(JobType.STREAMING);
}

public static JobGraphBuilder newBatchJobGraphBuilder() {
return new JobGraphBuilder(JobType.BATCH);
}
}
Loading

0 comments on commit e000723

Please sign in to comment.