Skip to content

Commit

Permalink
[FLINK-16655][FLINK-16657] Introduce embedded executor and use it in …
Browse files Browse the repository at this point in the history
…Web Submission

This closes apache#11460.
  • Loading branch information
kl0u committed Apr 6, 2020
1 parent d2be6ae commit 7381304
Show file tree
Hide file tree
Showing 17 changed files with 667 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;

import java.util.List;

/**
* An interface to be implemented by the entities responsible for application
* submission for the different deployment environments.
*
* <p>This interface assumes access to the cluster's {@link DispatcherGateway},
* and it does not go through the publicly exposed REST API.
*/
@Internal
public interface ApplicationRunner {

/**
* Runs the application using the provided {@code dispatcherGateway}.
*
* @param dispatcherGateway the dispatcher of the cluster to run the application.
* @param program the {@link PackagedProgram} containing the user's main method.
* @param configuration the configuration used to run the application.
*
* @return a list of the submitted jobs that belong to the provided application.
*/
List<JobID> run(final DispatcherGateway dispatcherGateway, final PackagedProgram program, final Configuration configuration);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.util.FlinkRuntimeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An {@link ApplicationRunner} which runs the user specified application using the {@link EmbeddedExecutor}.
* This runner invokes methods of the provided {@link DispatcherGateway} directly, and it does not go through
* the REST API.
*
* <p>In addition, this runner does not wait for the application to finish, but it submits the
* application in a {@code DETACHED} mode. As a consequence, applications with jobs that rely on
* operations like {@code [collect, print, printToErr, count]} will fail.
*/
@Internal
public class DetachedApplicationRunner implements ApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(DetachedApplicationRunner.class);

@Override
public List<JobID> run(final DispatcherGateway dispatcherGateway, final PackagedProgram program, final Configuration configuration) {
checkNotNull(dispatcherGateway);
checkNotNull(program);
checkNotNull(configuration);
return tryExecuteJobs(dispatcherGateway, program, configuration);
}

private List<JobID> tryExecuteJobs(final DispatcherGateway dispatcherGateway, final PackagedProgram program, final Configuration configuration) {
configuration.set(DeploymentOptions.ATTACHED, false);

final List<JobID> applicationJobIds = new ArrayList<>();
final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(applicationJobIds, dispatcherGateway);

try {
ClientUtils.executeProgram(executorServiceLoader, configuration, program);
} catch (ProgramInvocationException e) {
LOG.warn("Could not execute application: ", e);
throw new FlinkRuntimeException("Could not execute application.", e);
}

return applicationJobIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link JobClient} with the ability to also submit jobs which
* uses directly the {@link DispatcherGateway}.
*/
@Internal
public class EmbeddedJobClient implements JobClient {

private final JobID jobId;

private final DispatcherGateway dispatcherGateway;

private final Time timeout;

public EmbeddedJobClient(
final JobID jobId,
final DispatcherGateway dispatcherGateway,
final Time rpcTimeout) {
this.jobId = checkNotNull(jobId);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
this.timeout = checkNotNull(rpcTimeout);
}

@Override
public JobID getJobID() {
return jobId;
}

@Override
public CompletableFuture<JobStatus> getJobStatus() {
return dispatcherGateway.requestJobStatus(jobId, timeout);
}

@Override
public CompletableFuture<Void> cancel() {
return dispatcherGateway
.cancelJob(jobId, timeout)
.thenApply(ignores -> null);
}

@Override
public CompletableFuture<String> stopWithSavepoint(final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory) {
return dispatcherGateway.stopWithSavepoint(jobId, savepointDirectory, advanceToEndOfEventTime, timeout);
}

@Override
public CompletableFuture<String> triggerSavepoint(@Nullable final String savepointDirectory) {
return dispatcherGateway.triggerSavepoint(jobId, savepointDirectory, false, timeout);
}

@Override
public CompletableFuture<Map<String, Object>> getAccumulators(final ClassLoader classLoader) {
checkNotNull(classLoader);

return dispatcherGateway.requestJob(jobId, timeout)
.thenApply(ArchivedExecutionGraph::getAccumulatorsSerialized)
.thenApply(accumulators -> {
try {
return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, classLoader);
} catch (Exception e) {
throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
}
});
}

@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);

return dispatcherGateway
.requestJobResult(jobId, timeout)
.thenApply((jobResult) -> {
try {
return jobResult.toJobExecutionResult(userClassloader);
} catch (Throwable t) {
throw new CompletionException(
new Exception("Job " + jobId + " failed", t));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.client.deployment.executors.ExecutorUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.FunctionWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link PipelineExecutor} that invokes directly methods of the
* {@link org.apache.flink.runtime.dispatcher.DispatcherGateway Dispatcher} and does
* not go through the REST API.
*/
@Internal
public class EmbeddedExecutor implements PipelineExecutor {

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedExecutor.class);

public static final String NAME = "embedded";

private final Collection<JobID> submittedJobIds;

private final DispatcherGateway dispatcherGateway;

/**
* Creates an {@link EmbeddedExecutor}.
* @param submittedJobIds a list that is going to be filled with the job ids of the
* new jobs that will be submitted. This is essentially used to return the submitted job ids
* to the caller.
* @param dispatcherGateway the dispatcher of the cluster which is going to be used to submit jobs.
*/
public EmbeddedExecutor(
final Collection<JobID> submittedJobIds,
final DispatcherGateway dispatcherGateway) {
this.submittedJobIds = checkNotNull(submittedJobIds);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
}

@Override
public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) {
checkNotNull(pipeline);
checkNotNull(configuration);

final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
final JobID actualJobId = jobGraph.getJobID();

this.submittedJobIds.add(actualJobId);
LOG.info("Job {} is submitted.", actualJobId);

final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));

final CompletableFuture<JobID> jobSubmissionFuture = submitJob(
dispatcherGateway,
blobServerAddress -> new BlobClient(blobServerAddress, configuration),
jobGraph,
timeout);

final EmbeddedJobClient embeddedClient = new EmbeddedJobClient(
actualJobId,
dispatcherGateway,
timeout);

return jobSubmissionFuture
.thenApplyAsync(jobID -> embeddedClient);
}

private static CompletableFuture<JobID> submitJob(
final DispatcherGateway dispatcherGateway,
final FunctionWithException<InetSocketAddress, BlobClient, IOException> blobClientCreator,
final JobGraph jobGraph,
final Time rpcTimeout) {
checkNotNull(blobClientCreator);
checkNotNull(jobGraph);

LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());

return dispatcherGateway
.getBlobServerPort(rpcTimeout)
.thenApply(blobServerPort -> new InetSocketAddress(dispatcherGateway.getHostname(), blobServerPort))
.thenCompose(blobServerAddress -> {

try {
ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> blobClientCreator.apply(blobServerAddress));
} catch (FlinkException e) {
throw new CompletionException(e);
}

return dispatcherGateway.submitJob(jobGraph, rpcTimeout);
}).thenApply(ack -> jobGraph.getJobID());
}
}
Loading

0 comments on commit 7381304

Please sign in to comment.