Skip to content

Commit

Permalink
[FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExe…
Browse files Browse the repository at this point in the history
…cutor

This commit introduces the JobExecutor interface which abstracts the actual mini cluster
from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well as the
FlinkMiniCluster implement this interface, we can run all test base jobs either on the
Flip-6 mini cluster or on the current mini cluster.

This closes apache#4897.
  • Loading branch information
tillrohrmann committed Jan 10, 2018
1 parent 51a2787 commit 057edf9
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.minicluster;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;

/**
* Interface for {@link JobGraph} executors.
*/
public interface JobExecutor {

/**
* Run the given job and block until its execution result can be returned.
*
* @param jobGraph to execute
* @return Execution result of the executed job
* @throws JobExecutionException if the job failed to execute
*/
JobExecutionResult executeJobBlocking(final JobGraph jobGraph) throws JobExecutionException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

public class MiniCluster {
public class MiniCluster implements JobExecutor {

private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);

Expand Down Expand Up @@ -448,7 +448,8 @@ public void runDetached(JobGraph job) throws JobExecutionException {
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
@Override
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");

MiniClusterJobDispatcher dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ abstract class FlinkMiniCluster(
val userConfiguration: Configuration,
val highAvailabilityServices: HighAvailabilityServices,
val useSingleActorSystem: Boolean)
extends LeaderRetrievalListener {
extends LeaderRetrievalListener
with JobExecutor {

protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])

Expand Down Expand Up @@ -701,4 +702,15 @@ abstract class FlinkMiniCluster(

FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader)
}

/**
* Run the given job and block until its execution result can be returned.
*
* @param jobGraph to execute
* @return Execution result of the executed job
* @throws JobExecutionException if the job failed to execute
*/
override def executeJobBlocking(jobGraph: JobGraph) = {
submitJobAndWait(jobGraph, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void runJobWithMultipleJobManagers() throws Exception {

private static void executeJob(MiniCluster miniCluster) throws Exception {
JobGraph job = getSimpleJob();
miniCluster.runJobBlocking(job);
miniCluster.executeJobBlocking(job);
}

private static JobGraph getSimpleJob() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public JobExecutionResult execute(String jobName) throws Exception {

try {
miniCluster.start();
return miniCluster.runJobBlocking(jobGraph);
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutor;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
Expand All @@ -37,30 +38,30 @@
*/
public class TestStreamEnvironment extends StreamExecutionEnvironment {

/** The mini cluster in which this environment executes its jobs. */
private final LocalFlinkMiniCluster miniCluster;
/** The job executor to use to execute environment's jobs. */
private final JobExecutor jobExecutor;

private final Collection<Path> jarFiles;

private final Collection<URL> classPaths;

public TestStreamEnvironment(
LocalFlinkMiniCluster miniCluster,
JobExecutor jobExecutor,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths) {

this.miniCluster = Preconditions.checkNotNull(miniCluster);
this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
this.jarFiles = Preconditions.checkNotNull(jarFiles);
this.classPaths = Preconditions.checkNotNull(classPaths);

setParallelism(parallelism);
}

public TestStreamEnvironment(
LocalFlinkMiniCluster miniCluster,
JobExecutor jobExecutor,
int parallelism) {
this(miniCluster, parallelism, Collections.<Path>emptyList(), Collections.<URL>emptyList());
this(jobExecutor, parallelism, Collections.emptyList(), Collections.emptyList());
}

@Override
Expand All @@ -75,7 +76,7 @@ public JobExecutionResult execute(String jobName) throws Exception {

jobGraph.setClasspaths(new ArrayList<>(classPaths));

return miniCluster.submitJobAndWait(jobGraph, false);
return jobExecutor.executeJobBlocking(jobGraph);
}

// ------------------------------------------------------------------------
Expand All @@ -85,13 +86,13 @@ public JobExecutionResult execute(String jobName) throws Exception {
* the given cluster with the given default parallelism and the specified jar files and class
* paths.
*
* @param cluster The test cluster to run the test program on.
* @param jobExecutor The executor to execute the jobs on
* @param parallelism The default parallelism for the test programs.
* @param jarFiles Additional jar files to execute the job with
* @param classpaths Additional class paths to execute the job with
*/
public static void setAsContext(
final LocalFlinkMiniCluster cluster,
final JobExecutor jobExecutor,
final int parallelism,
final Collection<Path> jarFiles,
final Collection<URL> classpaths) {
Expand All @@ -100,7 +101,7 @@ public static void setAsContext(
@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
return new TestStreamEnvironment(
cluster,
jobExecutor,
parallelism,
jarFiles,
classpaths);
Expand All @@ -114,15 +115,15 @@ public StreamExecutionEnvironment createExecutionEnvironment() {
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
* the given cluster with the given default parallelism.
*
* @param cluster The test cluster to run the test program on.
* @param jobExecutor The executor to execute the jobs on
* @param parallelism The default parallelism for the test programs.
*/
public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
setAsContext(
cluster,
jobExecutor,
parallelism,
Collections.<Path>emptyList(),
Collections.<URL>emptyList());
Collections.emptyList(),
Collections.emptyList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutor;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Preconditions;

Expand All @@ -44,7 +46,7 @@
*/
public class TestEnvironment extends ExecutionEnvironment {

private final LocalFlinkMiniCluster miniCluster;
private final JobExecutor jobExecutor;

private final Collection<Path> jarFiles;

Expand All @@ -53,12 +55,12 @@ public class TestEnvironment extends ExecutionEnvironment {
private TestEnvironment lastEnv;

public TestEnvironment(
LocalFlinkMiniCluster miniCluster,
JobExecutor jobExecutor,
int parallelism,
boolean isObjectReuseEnabled,
Collection<Path> jarFiles,
Collection<URL> classPaths) {
this.miniCluster = Preconditions.checkNotNull(miniCluster);
this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
this.jarFiles = Preconditions.checkNotNull(jarFiles);
this.classPaths = Preconditions.checkNotNull(classPaths);

Expand All @@ -77,15 +79,15 @@ public TestEnvironment(
}

public TestEnvironment(
LocalFlinkMiniCluster executor,
JobExecutor executor,
int parallelism,
boolean isObjectReuseEnabled) {
this(
executor,
parallelism,
isObjectReuseEnabled,
Collections.<Path>emptyList(),
Collections.<URL>emptyList());
Collections.emptyList(),
Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -115,7 +117,7 @@ public JobExecutionResult execute(String jobName) throws Exception {

jobGraph.setClasspaths(new ArrayList<>(classPaths));

this.lastJobExecutionResult = miniCluster.submitJobAndWait(jobGraph, false);
this.lastJobExecutionResult = jobExecutor.executeJobBlocking(jobGraph);
return this.lastJobExecutionResult;
}

Expand All @@ -130,15 +132,15 @@ public String getExecutionPlan() throws Exception {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);

Optimizer pc = new Optimizer(new DataStatistics(), this.miniCluster.configuration());
Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
return pc.compile(p);
}

public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
lastEnv = new TestEnvironment(miniCluster, getParallelism(), getConfig().isObjectReuseEnabled());
lastEnv = new TestEnvironment(jobExecutor, getParallelism(), getConfig().isObjectReuseEnabled());
return lastEnv;
}
};
Expand All @@ -153,13 +155,13 @@ public ExecutionEnvironment createExecutionEnvironment() {
* environment executes the given jobs on a Flink mini cluster with the given default
* parallelism and the additional jar files and class paths.
*
* @param miniCluster The mini cluster on which to execute the jobs
* @param jobExecutor The executor to run the jobs on
* @param parallelism The default parallelism
* @param jarFiles Additional jar files to execute the job with
* @param classPaths Additional class paths to execute the job with
*/
public static void setAsContext(
final LocalFlinkMiniCluster miniCluster,
final JobExecutor jobExecutor,
final int parallelism,
final Collection<Path> jarFiles,
final Collection<URL> classPaths) {
Expand All @@ -168,7 +170,7 @@ public static void setAsContext(
@Override
public ExecutionEnvironment createExecutionEnvironment() {
return new TestEnvironment(
miniCluster,
jobExecutor,
parallelism,
false,
jarFiles,
Expand All @@ -185,15 +187,15 @@ public ExecutionEnvironment createExecutionEnvironment() {
* environment executes the given jobs on a Flink mini cluster with the given default
* parallelism and the additional jar files and class paths.
*
* @param miniCluster The mini cluster on which to execute the jobs
* @param jobExecutor The executor to run the jobs on
* @param parallelism The default parallelism
*/
public static void setAsContext(final LocalFlinkMiniCluster miniCluster, final int parallelism) {
public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
setAsContext(
miniCluster,
jobExecutor,
parallelism,
Collections.<Path>emptyList(),
Collections.<URL>emptyList());
Collections.emptyList(),
Collections.emptyList());
}

public static void unsetAsContext() {
Expand Down

0 comments on commit 057edf9

Please sign in to comment.