Skip to content

Commit

Permalink
[FLINK-14593][client] Port ClusterClient to asynchronous interface ve…
Browse files Browse the repository at this point in the history
…rsion

This closes apache#10069 .
  • Loading branch information
tisonkun committed Nov 8, 2019
1 parent 96df4fa commit d938c19
Show file tree
Hide file tree
Showing 29 changed files with 418 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ protected void stop(String[] args) throws Exception {
clusterClient -> {
final String savepointPath;
try {
savepointPath = clusterClient.stopWithSavepoint(jobId, advanceToEndOfEventTime, targetDirectory);
savepointPath = clusterClient.stopWithSavepoint(jobId, advanceToEndOfEventTime, targetDirectory).get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException("Could not stop with a savepoint job \"" + jobId + "\".", e);
}
Expand Down Expand Up @@ -597,7 +597,7 @@ protected void cancel(String[] args) throws Exception {
clusterClient -> {
final String savepointPath;
try {
savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory);
savepointPath = clusterClient.cancelWithSavepoint(jobId, targetDirectory).get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException("Could not cancel job " + jobId + '.', e);
}
Expand All @@ -619,7 +619,7 @@ protected void cancel(String[] args) throws Exception {
commandLine,
clusterClient -> {
try {
clusterClient.cancel(jobId);
clusterClient.cancel(jobId).get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException("Could not cancel job " + jobId + '.', e);
}
Expand Down Expand Up @@ -695,26 +695,22 @@ protected void savepoint(String[] args) throws Exception {
/**
* Sends a SavepointTriggerMessage to the job manager.
*/
private String triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory) throws FlinkException {
private void triggerSavepoint(ClusterClient<?> clusterClient, JobID jobId, String savepointDirectory) throws FlinkException {
logAndSysout("Triggering savepoint for job " + jobId + '.');

CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory);

logAndSysout("Waiting for response...");

final String savepointPath;

try {
savepointPath = savepointPathFuture.get();
}
catch (Exception e) {
final String savepointPath = savepointPathFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);

logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");
} catch (Exception e) {
Throwable cause = ExceptionUtils.stripExecutionException(e);
throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
}

logAndSysout("Savepoint completed. Path: " + savepointPath);
logAndSysout("You can resume your program from this savepoint with the run command.");

return savepointPath;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,78 +41,75 @@
*
* @param <T> type of the cluster id
*/
public abstract class ClusterClient<T> implements AutoCloseable {

/**
* User overridable hook to close the client, possibly closes internal services.
* @deprecated use the {@link #close()} instead. This method stays for backwards compatibility.
*/
public void shutdown() throws Exception {
close();
}
public interface ClusterClient<T> extends AutoCloseable {

@Override
public void close() throws Exception {
default void close() throws Exception {

}

/**
* Requests the {@link JobStatus} of the job with the given {@link JobID}.
* Returns the cluster id identifying the cluster to which the client is connected.
*
* @return cluster id of the connected cluster
*/
public abstract CompletableFuture<JobStatus> getJobStatus(JobID jobId);
T getClusterId();

/**
* Cancels a job identified by the job id.
* @param jobId the job id
* @throws Exception In case an error occurred.
* Return the Flink configuration object.
*
* @return The Flink configuration object
*/
public abstract void cancel(JobID jobId) throws Exception;
Configuration getFlinkConfiguration();

/**
* Cancels a job identified by the job id and triggers a savepoint.
* @param jobId the job id
* @param savepointDirectory directory the savepoint should be written to
* @return path where the savepoint is located
* @throws Exception In case an error occurred.
* Shut down the cluster that this client communicate with.
*/
public abstract String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception;
default void shutDownCluster() {
throw new UnsupportedOperationException();
}

/**
* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
* Stopping works only for streaming programs. Be aware, that the program might continue to run for
* a while after sending the stop command, because after sources stopped to emit data all operators
* need to finish processing.
* Returns an URL (as a string) to the cluster web interface.
*/
String getWebInterfaceURL();

/**
* Lists the currently running and finished jobs on the cluster.
*
* @param jobId the job ID of the streaming program to stop
* @param advanceToEndOfEventTime flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* @param savepointDirectory directory the savepoint should be written to
* @return a {@link CompletableFuture} containing the path where the savepoint is located
* @throws Exception
* If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
* failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
* @return future collection of running and finished jobs
* @throws Exception if no connection to the cluster could be established
*/
public abstract String stopWithSavepoint(final JobID jobId, final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory) throws Exception;
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

/**
* Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
* directory, or {@link org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
* Dispose the savepoint under the given path.
*
* @param jobId job id
* @param savepointDirectory directory the savepoint should be written to
* @return path future where the savepoint is located
* @throws FlinkException if no connection to the cluster could be established
* @param savepointPath path to the savepoint to be disposed
* @return acknowledge future of the dispose action
*/
public abstract CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException;
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;

public abstract CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
/**
* Submit the given {@link JobGraph} to the cluster.
*
* @param jobGraph to submit
* @return Future which is completed with the {@link JobSubmissionResult}
*/
CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);

/**
* Lists the currently running and finished jobs on the cluster.
* Requests the {@link JobStatus} of the job with the given {@link JobID}.
*/
CompletableFuture<JobStatus> getJobStatus(JobID jobId);

/**
* Request the {@link JobResult} for the given {@link JobID}.
*
* @return future collection of running and finished jobs
* @throws Exception if no connection to the cluster could be established
* @param jobId for which to request the {@link JobResult}
* @return Future which is completed with the {@link JobResult}
*/
public abstract CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);

/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
Expand All @@ -121,7 +118,7 @@ public void close() throws Exception {
* @param jobID The job identifier of a job.
* @return A Map containing the accumulator's name and its value.
*/
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
default CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID) {
return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
}

Expand All @@ -132,47 +129,45 @@ public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws
* @param loader The class loader for deserializing the accumulator results.
* @return A Map containing the accumulator's name and its value.
*/
public abstract Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception;

// ------------------------------------------------------------------------
// Abstract methods to be implemented by the cluster specific Client
// ------------------------------------------------------------------------

/**
* Returns an URL (as a string) to the JobManager web interface.
*/
public abstract String getWebInterfaceURL();
CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader);

/**
* Returns the cluster id identifying the cluster to which the client is connected.
* Cancels a job identified by the job id.
*
* @return cluster id of the connected cluster
* @param jobId the job id
*/
public abstract T getClusterId();
CompletableFuture<Acknowledge> cancel(JobID jobId);

/**
* Return the Flink configuration object.
* @return The Flink configuration object
* Cancels a job identified by the job id and triggers a savepoint.
*
* @param jobId the job id
* @param savepointDirectory directory the savepoint should be written to
* @return future of path where the savepoint is located
*/
public abstract Configuration getFlinkConfiguration();
CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory);

/**
* Submit the given {@link JobGraph} to the cluster.
* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
* Stopping works only for streaming programs. Be aware, that the program might continue to run for
* a while after sending the stop command, because after sources stopped to emit data all operators
* need to finish processing.
*
* @param jobGraph to submit
* @return Future which is completed with the {@link JobSubmissionResult}
* @param jobId the job ID of the streaming program to stop
* @param advanceToEndOfEventTime flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* @param savepointDirectory directory the savepoint should be written to
* @return a {@link CompletableFuture} containing the path where the savepoint is located
*/
public abstract CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph);
CompletableFuture<String> stopWithSavepoint(final JobID jobId, final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory);

/**
* Request the {@link JobResult} for the given {@link JobID}.
* Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
* directory, or {@link org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
*
* @param jobId for which to request the {@link JobResult}
* @return Future which is completed with the {@link JobResult}
* @param jobId job id
* @param savepointDirectory directory the savepoint should be written to
* @return path future where the savepoint is located
* @throws FlinkException if no connection to the cluster could be established
*/
public abstract CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId);

public void shutDownCluster() {
throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster.");
}
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,29 @@
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

/**
* Client to interact with a {@link MiniCluster}.
*/
public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClusterId> {
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {

private static final Logger LOG = LoggerFactory.getLogger(MiniClusterClient.class);

private final MiniCluster miniCluster;
private final Configuration configuration;
Expand All @@ -68,18 +76,18 @@ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
}

@Override
public void cancel(JobID jobId) throws Exception {
miniCluster.cancelJob(jobId).get();
public CompletableFuture<Acknowledge> cancel(JobID jobId) {
return miniCluster.cancelJob(jobId);
}

@Override
public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
return miniCluster.triggerSavepoint(jobId, savepointDirectory, true).get();
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) {
return miniCluster.triggerSavepoint(jobId, savepointDirectory, true);
}

@Override
public String stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirector) throws Exception {
return miniCluster.stopWithSavepoint(jobId, savepointDirector, advanceToEndOfEventTime).get();
public CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirector) {
return miniCluster.stopWithSavepoint(jobId, savepointDirector, advanceToEndOfEventTime);
}

@Override
Expand All @@ -98,14 +106,21 @@ public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
}

@Override
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
AccessExecutionGraph executionGraph = miniCluster.getExecutionGraph(jobID).get();
Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulatorsSerialized.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulatorsSerialized.entrySet()) {
result.put(acc.getKey(), acc.getValue().deserializeValue(loader));
}
return result;
public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
return miniCluster
.getExecutionGraph(jobID)
.thenApply(AccessExecutionGraph::getAccumulatorsSerialized)
.thenApply(accumulators -> {
Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulators.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulators.entrySet()) {
try {
result.put(acc.getKey(), acc.getValue().deserializeValue(loader));
} catch (Exception e) {
throw new CompletionException("Cannot deserialize accumulators.", e);
}
}
return result;
});
}

@Override
Expand All @@ -120,7 +135,14 @@ public MiniClusterClient.MiniClusterId getClusterId() {

@Override
public String getWebInterfaceURL() {
return miniCluster.getRestAddress().toString();
try {
return miniCluster.getRestAddress().get().toString();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);

LOG.warn("Could not retrieve the web interface URL for the cluster.", e);
return "Unknown address.";
}
}

enum MiniClusterId {
Expand Down
Loading

0 comments on commit d938c19

Please sign in to comment.