Skip to content

Commit

Permalink
[FLINK-14807][rest] Introduce REST API for communication between clie…
Browse files Browse the repository at this point in the history
…nts and operator coordinators

This closes apache#12037
  • Loading branch information
tsreaper committed May 14, 2020
1 parent 96a6337 commit 9fe920f
Show file tree
Hide file tree
Showing 34 changed files with 1,112 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

import org.apache.commons.io.IOUtils;

Expand All @@ -41,7 +45,7 @@
/**
* An implementation of the {@link JobClient} interface that uses a {@link ClusterClient} underneath..
*/
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {

private final ClusterClientProvider<ClusterID> clusterClientProvider;

Expand Down Expand Up @@ -115,6 +119,13 @@ public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLo
})));
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
return bridgeClientRequest(
clusterClientProvider,
clusterClient -> clusterClient.sendCoordinationRequest(jobID, operatorId, request));
}

private static <T> CompletableFuture<T> bridgeClientRequest(
ClusterClientProvider<?> clusterClientProvider,
Function<ClusterClient<?>, CompletableFuture<T>> resultRetriever) {
Expand All @@ -132,5 +143,4 @@ private static <T> CompletableFuture<T> bridgeClientRequest(
return resultFuture.whenCompleteAsync(
(jobResult, throwable) -> IOUtils.closeQuietly(clusterClient::close));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -42,7 +49,7 @@
* uses directly the {@link DispatcherGateway}.
*/
@Internal
public class EmbeddedJobClient implements JobClient {
public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {

private final JobID jobId;

Expand Down Expand Up @@ -119,4 +126,15 @@ public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLo
}
});
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
try {
SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
return dispatcherGateway.deliverCoordinationRequestToCoordinator(
jobId, operatorId, serializedRequest, timeout);
} catch (IOException e) {
return FutureUtils.completedExceptionally(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -162,4 +165,14 @@ default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) {
* @return path future where the savepoint is located
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);

/**
* Sends out a request to a specified coordinator and return the response.
*
* @param jobId specifies the job which the coordinator belongs to
* @param operatorId specifies which coordinator to receive the request
* @param request the request to send
* @return the response from the coordinator
*/
CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, OperatorID operatorId, CoordinationRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -155,6 +161,20 @@ public String getWebInterfaceURL() {
}
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request) {
try {
SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
return miniCluster.deliverCoordinationRequestToCoordinator(jobId, operatorId, serializedRequest);
} catch (IOException e) {
LOG.error("Error while sending coordination request", e);
return FutureUtils.completedExceptionally(e);
}
}

/**
* The type of the Cluster ID for the local {@link MiniCluster}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,25 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.SerializedValue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -127,7 +134,7 @@ private static void shutDownCluster(MiniCluster miniCluster) {
/**
* A {@link JobClient} for a {@link PerJobMiniClusterFactory}.
*/
private static final class PerJobMiniClusterJobClient implements JobClient {
private static final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway {

private final JobID jobID;
private final MiniCluster miniCluster;
Expand Down Expand Up @@ -182,5 +189,15 @@ public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader c
}
});
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) {
try {
SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<>(request);
return miniCluster.deliverCoordinationRequestToCoordinator(jobID, operatorId, serializedRequest);
} catch (IOException e) {
return FutureUtils.completedExceptionally(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
Expand Down Expand Up @@ -67,6 +70,9 @@
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
Expand All @@ -88,6 +94,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.CheckedSupplier;

import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -411,6 +418,36 @@ public CompletableFuture<String> triggerSavepoint(
return triggerSavepoint(jobId, savepointDirectory, false);
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request) {
ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
params.jobPathParameter.resolve(jobId);
params.operatorPathParameter.resolve(operatorId);

SerializedValue<CoordinationRequest> serializedRequest;
try {
serializedRequest = new SerializedValue<>(request);
} catch (IOException e) {
return FutureUtils.completedExceptionally(e);
}

ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
return sendRequest(headers, params, requestBody).thenApply(
responseBody -> {
try {
return responseBody
.getSerializedCoordinationResponse()
.deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException("Failed to deserialize coordination response", e);
}
});
}

private CompletableFuture<String> triggerSavepoint(
final JobID jobId,
final @Nullable String savepointDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.function.TriFunction;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -132,6 +135,14 @@ public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
return triggerSavepointFunction.apply(jobId, savepointDirectory);
}

@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request) {
throw new UnsupportedOperationException();
}

@Override
public void close() {

Expand Down
Loading

0 comments on commit 9fe920f

Please sign in to comment.