Skip to content

Commit

Permalink
[minor] Simplify PlanExecutor.execute() signature
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 22, 2020
1 parent 19d9cee commit 31827ff
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private LocalExecutor(Configuration configuration, Function<MiniClusterConfigura
}

@Override
public CompletableFuture<? extends JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private PerJobMiniClusterFactory(
/**
* Starts a {@link MiniCluster} and submits a job.
*/
public CompletableFuture<? extends JobClient> submitJob(JobGraph jobGraph) throws Exception {
public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws Exception {
MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
Expand All @@ -87,7 +87,8 @@ public CompletableFuture<? extends JobClient> submitJob(JobGraph jobGraph) throw
// We failed to create the JobClient and must shutdown to ensure cleanup.
shutDownCluster(miniCluster);
}
});
})
.thenApply(Function.identity());
}

private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ public interface PipelineExecutor {
* @param configuration the {@link Configuration} with the required execution parameters
* @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
*/
CompletableFuture<? extends JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ public JobClient executeAsync(String jobName) throws Exception {
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(plan, configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class CollectionPipelineExecutor implements PipelineExecutor {
public static final String NAME = "collection";

@Override
public CompletableFuture<? extends JobClient> execute(
public CompletableFuture<JobClient> execute(
Pipeline pipeline,
Configuration configuration) throws Exception {
Plan plan = (Plan) pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public ProgramDeployer(
this.jobName = jobName;
}

public CompletableFuture<? extends JobClient> deploy() {
public CompletableFuture<JobClient> deploy() {
LOG.info("Submitting job {} for query {}`", pipeline, jobName);
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration);
Expand All @@ -77,7 +77,7 @@ public CompletableFuture<? extends JobClient> deploy() {
}

final PipelineExecutor executor = executorFactory.getExecutor(configuration);
CompletableFuture<? extends JobClient> jobClient;
CompletableFuture<JobClient> jobClient;
try {
jobClient = executor.execute(pipeline, configuration);
} catch (Exception e) {
Expand Down

0 comments on commit 31827ff

Please sign in to comment.