Skip to content

Commit

Permalink
[FLINK-21188][runtime] Introduces ExecutionGraphInfo
Browse files Browse the repository at this point in the history
This closes apache#14804.
  • Loading branch information
XComp authored and tillrohrmann committed Feb 23, 2021
1 parent 278daba commit b7e93fd
Show file tree
Hide file tree
Showing 73 changed files with 966 additions and 577 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
Expand Down Expand Up @@ -78,9 +78,9 @@ protected ApplicationClusterEntryPoint(
}

@Override
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
final Configuration configuration, final ScheduledExecutor scheduledExecutor) {
return new MemoryArchivedExecutionGraphStore();
return new MemoryExecutionGraphInfoStore();
}

protected static void configureExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -121,7 +122,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher

private final DispatcherBootstrapFactory dispatcherBootstrapFactory;

private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
private final ExecutionGraphInfoStore executionGraphInfoStore;

private final JobManagerRunnerFactory jobManagerRunnerFactory;

Expand Down Expand Up @@ -177,7 +178,7 @@ public Dispatcher(

this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();

this.archivedExecutionGraphStore = dispatcherServices.getArchivedExecutionGraphStore();
this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();

this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();

Expand Down Expand Up @@ -441,7 +442,7 @@ private CleanupJobState handleDispatcherJobResult(
&& executionType == ExecutionType.RECOVERY) {
return dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
} else {
return jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
return jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo());
}
}

Expand Down Expand Up @@ -557,8 +558,7 @@ public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<Collection<JobStatus>> allJobsFuture =
allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);

final JobsOverview completedJobsOverview =
archivedExecutionGraphStore.getStoredJobsOverview();
final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview();

return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
Expand All @@ -581,7 +581,7 @@ public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time tim
optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);

final Collection<JobDetails> completedJobDetails =
archivedExecutionGraphStore.getAvailableJobDetails();
executionGraphInfoStore.getAvailableJobDetails();

return combinedJobDetails.thenApply(
(Collection<JobDetails> runningJobDetails) -> {
Expand All @@ -603,7 +603,7 @@ public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout)
() -> {
// is it a completed job?
final JobDetails jobDetails =
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
executionGraphInfoStore.getAvailableJobDetails(jobId);
if (jobDetails == null) {
return FutureUtils.completedExceptionally(
new FlinkJobNotFoundException(jobId));
Expand All @@ -614,17 +614,18 @@ public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout)
}

@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException =
public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
JobID jobId, Time timeout) {
Function<Throwable, ExecutionGraphInfo> checkExecutionGraphStoreOnException =
throwable -> {
// check whether it is a completed job
final ArchivedExecutionGraph archivedExecutionGraph =
archivedExecutionGraphStore.get(jobId);
if (archivedExecutionGraph == null) {
final ExecutionGraphInfo executionGraphInfo =
executionGraphInfoStore.get(jobId);
if (executionGraphInfo == null) {
throw new CompletionException(
ExceptionUtils.stripCompletionException(throwable));
} else {
return archivedExecutionGraph;
return executionGraphInfo;
}
};
Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
Expand All @@ -638,21 +639,22 @@ public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout)
DispatcherJob job = runningJobs.get(jobId);

if (job == null) {
final ArchivedExecutionGraph archivedExecutionGraph =
archivedExecutionGraphStore.get(jobId);
final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);

if (archivedExecutionGraph == null) {
if (executionGraphInfo == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
return CompletableFuture.completedFuture(
JobResult.createFrom(archivedExecutionGraph));
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
}
} else {
return job.getResultFuture()
.thenApply(
dispatcherJobResult ->
JobResult.createFrom(
dispatcherJobResult.getArchivedExecutionGraph()));
dispatcherJobResult
.getExecutionGraphInfo()
.getArchivedExecutionGraph()));
}
}

Expand Down Expand Up @@ -827,7 +829,9 @@ protected void onFatalError(Throwable throwable) {
}

protected CleanupJobState jobReachedGloballyTerminalState(
ArchivedExecutionGraph archivedExecutionGraph) {
ExecutionGraphInfo executionGraphInfo) {
ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
Preconditions.checkArgument(
archivedExecutionGraph.getState().isGloballyTerminalState(),
"Job %s is in state %s which is not globally terminal.",
Expand All @@ -839,32 +843,32 @@ protected CleanupJobState jobReachedGloballyTerminalState(
archivedExecutionGraph.getJobID(),
archivedExecutionGraph.getState());

archiveExecutionGraph(archivedExecutionGraph);
archiveExecutionGraph(executionGraphInfo);

return CleanupJobState.GLOBAL;
}

private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
try {
archivedExecutionGraphStore.put(archivedExecutionGraph);
executionGraphInfoStore.put(executionGraphInfo);
} catch (IOException e) {
log.info(
"Could not store completed job {}({}).",
archivedExecutionGraph.getJobName(),
archivedExecutionGraph.getJobID(),
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}

final CompletableFuture<Acknowledge> executionGraphFuture =
historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);

executionGraphFuture.whenComplete(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
log.info(
"Could not archive completed job {}({}) to the history server.",
archivedExecutionGraph.getJobName(),
archivedExecutionGraph.getJobID(),
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
throwable);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -137,8 +138,7 @@ private DispatcherJob(
private void handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult) {
if (jobManagerRunnerResult.isSuccess()) {
jobResultFuture.complete(
DispatcherJobResult.forSuccess(
jobManagerRunnerResult.getArchivedExecutionGraph()));
DispatcherJobResult.forSuccess(jobManagerRunnerResult.getExecutionGraphInfo()));
} else if (jobManagerRunnerResult.isJobNotFinished()) {
jobResultFuture.completeExceptionally(new JobNotFinishedException(jobId));
} else if (jobManagerRunnerResult.isInitializationFailure()) {
Expand All @@ -156,7 +156,7 @@ private void handleInitializationFailure(Throwable initializationFailure) {
initializationTimestamp);
jobResultFuture.complete(
DispatcherJobResult.forInitializationFailure(
archivedExecutionGraph, initializationFailure));
new ExecutionGraphInfo(archivedExecutionGraph), initializationFailure));
}

public CompletableFuture<DispatcherJobResult> getResultFuture() {
Expand All @@ -166,9 +166,10 @@ public CompletableFuture<DispatcherJobResult> getResultFuture() {
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return requestJob(timeout)
.thenApply(
executionGraph -> {
executionGraphInfo -> {
synchronized (lock) {
return JobDetails.createDetailsForJob(executionGraph);
return JobDetails.createDetailsForJob(
executionGraphInfo.getArchivedExecutionGraph());
}
});
}
Expand Down Expand Up @@ -205,16 +206,18 @@ public CompletableFuture<Acknowledge> cancel(Time timeout) {
}

public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
return requestJob(timeout)
.thenApply(
executionGraphInfo ->
executionGraphInfo.getArchivedExecutionGraph().getState());
}

/** Returns a future completing to the ArchivedExecutionGraph of the job. */
public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
/** Returns a future completing to the ExecutionGraphInfo of the job. */
public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
synchronized (lock) {
if (isInitialized()) {
if (jobResultFuture.isDone()) { // job is not running anymore
return jobResultFuture.thenApply(
DispatcherJobResult::getArchivedExecutionGraph);
return jobResultFuture.thenApply(DispatcherJobResult::getExecutionGraphInfo);
}
// job is still running
return getJobMasterGateway()
Expand All @@ -224,12 +227,13 @@ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
this.jobStatus == DispatcherJobStatus.INITIALIZING
|| jobStatus == DispatcherJobStatus.CANCELLING);
return CompletableFuture.completedFuture(
ArchivedExecutionGraph.createFromInitializingJob(
jobId,
jobName,
jobStatus.asJobStatus(),
null,
initializationTimestamp));
new ExecutionGraphInfo(
ArchivedExecutionGraph.createFromInitializingJob(
jobId,
jobName,
jobStatus.asJobStatus(),
null,
initializationTimestamp)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,35 @@
package org.apache.flink.runtime.dispatcher;

import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

/**
* Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization
* has failed. For initialization failures, the throwable is also attached, to avoid deserializing
* it from the ArchivedExecutionGraph.
* Container for returning the {@link ExecutionGraphInfo} and a flag whether the initialization has
* failed. For initialization failures, the throwable is also attached, to avoid deserializing it
* from the {@link ArchivedExecutionGraph}.
*/
final class DispatcherJobResult {

private final ArchivedExecutionGraph archivedExecutionGraph;
private final ExecutionGraphInfo executionGraphInfo;

// if the throwable field is set, the job failed during initialization.
@Nullable private final Throwable initializationFailure;

private DispatcherJobResult(
ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
this.archivedExecutionGraph = archivedExecutionGraph;
ExecutionGraphInfo executionGraphInfo, @Nullable Throwable throwable) {
this.executionGraphInfo = executionGraphInfo;
this.initializationFailure = throwable;
}

public boolean isInitializationFailure() {
return initializationFailure != null;
}

public ArchivedExecutionGraph getArchivedExecutionGraph() {
return archivedExecutionGraph;
public ExecutionGraphInfo getExecutionGraphInfo() {
return executionGraphInfo;
}

/** @throws IllegalStateException if this DispatcherJobResult is a successful initialization. */
Expand All @@ -58,11 +59,11 @@ public Throwable getInitializationFailure() {
}

public static DispatcherJobResult forInitializationFailure(
ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
return new DispatcherJobResult(archivedExecutionGraph, throwable);
ExecutionGraphInfo executionGraphInfo, Throwable throwable) {
return new DispatcherJobResult(executionGraphInfo, throwable);
}

public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
return new DispatcherJobResult(archivedExecutionGraph, null);
public static DispatcherJobResult forSuccess(ExecutionGraphInfo executionGraphInfo) {
return new DispatcherJobResult(executionGraphInfo, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DispatcherServices {

@Nonnull private final JobManagerMetricGroup jobManagerMetricGroup;

@Nonnull private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
@Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;

@Nonnull private final FatalErrorHandler fatalErrorHandler;

Expand All @@ -68,7 +68,7 @@ public DispatcherServices(
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull BlobServer blobServer,
@Nonnull HeartbeatServices heartbeatServices,
@Nonnull ArchivedExecutionGraphStore archivedExecutionGraphStore,
@Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
@Nonnull FatalErrorHandler fatalErrorHandler,
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
Expand All @@ -81,7 +81,7 @@ public DispatcherServices(
this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
this.blobServer = blobServer;
this.heartbeatServices = heartbeatServices;
this.archivedExecutionGraphStore = archivedExecutionGraphStore;
this.executionGraphInfoStore = executionGraphInfoStore;
this.fatalErrorHandler = fatalErrorHandler;
this.historyServerArchivist = historyServerArchivist;
this.metricQueryServiceAddress = metricQueryServiceAddress;
Expand Down Expand Up @@ -122,8 +122,8 @@ public JobManagerMetricGroup getJobManagerMetricGroup() {
}

@Nonnull
public ArchivedExecutionGraphStore getArchivedExecutionGraphStore() {
return archivedExecutionGraphStore;
public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
return executionGraphInfoStore;
}

@Nonnull
Expand Down
Loading

0 comments on commit b7e93fd

Please sign in to comment.