Skip to content

Commit

Permalink
[FLINK-19000] Forward initialization timestamp from Dispatcher to Exe…
Browse files Browse the repository at this point in the history
…cutionGraph

This closes apache#13368
  • Loading branch information
rmetzger committed Sep 16, 2020
1 parent 5abef56 commit 12967c8
Show file tree
Hide file tree
Showing 34 changed files with 141 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public JobManagerRunner createJobManagerRunner(
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {

final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);

Expand All @@ -78,6 +79,7 @@ public JobManagerRunner createJobManagerRunner(
highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler);
fatalErrorHandler,
initializationTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,14 @@ private void persistAndRunJob(JobGraph jobGraph) throws Exception {

private void runJob(JobGraph jobGraph, ExecutionType executionType) {
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));

CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
long initializationTimestamp = System.currentTimeMillis();
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);

DispatcherJob dispatcherJob = DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName());
jobGraph.getName(),
initializationTimestamp);
runningJobs.put(jobGraph.getJobID(), dispatcherJob);

final JobID jobId = jobGraph.getJobID();
Expand Down Expand Up @@ -399,7 +400,7 @@ private void dispatcherJobFailed(JobID jobId, Throwable throwable) {
}
}

CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
Expand All @@ -412,7 +413,8 @@ CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler);
fatalErrorHandler,
initializationTimestamp);
runner.start();
return runner;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,20 @@ public JobStatus asJobStatus() {
static DispatcherJob createFor(
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
JobID jobId,
String jobName) {
return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName);
String jobName,
long initializationTimestamp) {
return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp);
}

private DispatcherJob(
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
JobID jobId,
String jobName) {
String jobName,
long initializationTimestamp) {
this.jobManagerRunnerFuture = jobManagerRunnerFuture;
this.jobId = jobId;
this.jobName = jobName;
this.initializationTimestamp = System.currentTimeMillis();
this.initializationTimestamp = initializationTimestamp;
this.jobResultFuture = new CompletableFuture<>();

FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ JobManagerRunner createJobManagerRunner(
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ public ExecutionGraph(
JobMasterPartitionTracker partitionTracker,
ScheduleMode scheduleMode,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener) throws IOException {
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp) throws IOException {

this.jobInformation = Preconditions.checkNotNull(jobInformation);

Expand All @@ -365,6 +366,7 @@ public ExecutionGraph(
this.jobStatusListeners = new ArrayList<>();

this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();

this.rpcTimeout = checkNotNull(rpcTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static ExecutionGraph buildGraph(
Time allocationTimeout,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException {
JobMasterPartitionTracker partitionTracker,
long initializationTimestamp) throws JobExecutionException, JobException {

final FailoverStrategy.Factory failoverStrategy =
FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
Expand All @@ -123,7 +124,8 @@ public static ExecutionGraph buildGraph(
partitionTracker,
failoverStrategy,
NoOpExecutionDeploymentListener.get(),
(execution, newState) -> {});
(execution, newState) -> {},
initializationTimestamp);
}

public static ExecutionGraph buildGraph(
Expand All @@ -145,7 +147,8 @@ public static ExecutionGraph buildGraph(
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener) throws JobExecutionException, JobException {
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp) throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");

Expand Down Expand Up @@ -187,7 +190,8 @@ public static ExecutionGraph buildGraph(
partitionTracker,
jobGraph.getScheduleMode(),
executionDeploymentListener,
executionStateUpdateListener);
executionStateUpdateListener,
initializationTimestamp);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public JobManagerRunnerImpl(
final HighAvailabilityServices haServices,
final LibraryCacheManager.ClassLoaderLease classLoaderLease,
final Executor executor,
final FatalErrorHandler fatalErrorHandler) throws Exception {
final FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {

this.resultFuture = new CompletableFuture<>();
this.terminationFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -137,7 +138,7 @@ public JobManagerRunnerImpl(
this.leaderGatewayFuture = new CompletableFuture<>();

// now start the JobManager
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
}

//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast

private final SchedulerNGFactory schedulerNGFactory;

private final long initializationTimestamp;

// --------- BackPressure --------

private final BackPressureStatsTracker backPressureStatsTracker;
Expand Down Expand Up @@ -224,7 +226,8 @@ public JobMaster(
ShuffleMaster<?> shuffleMaster,
PartitionTrackerFactory partitionTrackerFactory,
ExecutionDeploymentTracker executionDeploymentTracker,
ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory) throws Exception {
ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
long initializationTimestamp) throws Exception {

super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);

Expand Down Expand Up @@ -271,6 +274,7 @@ public void onUnknownDeploymentsOf(Collection<ExecutionAttemptID> executionAttem
this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
this.initializationTimestamp = initializationTimestamp;

final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
Expand Down Expand Up @@ -308,7 +312,8 @@ public void onUnknownDeploymentsOf(Collection<ExecutionAttemptID> executionAttem
this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
}

private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeploymentTracker, final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeploymentTracker,
final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
return schedulerNGFactory.createInstance(
log,
jobGraph,
Expand All @@ -325,7 +330,8 @@ private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeployme
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker,
executionDeploymentTracker);
executionDeploymentTracker,
initializationTimestamp);
}

//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {

builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));

final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
// guard against clock changes
final long guardedNetRuntime = Math.max(netRuntime, 0L);
builder.netRuntime(guardedNetRuntime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public DefaultJobMasterServiceFactory(
public JobMaster createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader) throws Exception {
ClassLoader userCodeClassloader,
long initializationTimestamp) throws Exception {

return new JobMaster(
rpcService,
Expand All @@ -110,6 +111,7 @@ public JobMaster createJobMasterService(
lookup
),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new);
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public interface JobMasterServiceFactory {
JobMasterService createJobMasterService(
JobGraph jobGraph,
OnCompletionActions jobCompletionActions,
ClassLoader userCodeClassloader) throws Exception;
ClassLoader userCodeClassloader,
long initializationTimestamp) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)

private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
final long now = System.currentTimeMillis();
final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
final long startTime = executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
final long endTime = executionGraph.getState().isGloballyTerminalState() ?
executionGraph.getStatusTimestamp(executionGraph.getState()) : -1L;
final long duration = (endTime > 0L ? endTime : now) - startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
final ExecutionVertexOperations executionVertexOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
final ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception {

super(
log,
Expand All @@ -148,7 +149,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
partitionTracker,
executionVertexVersioner,
executionDeploymentTracker,
false);
false,
initializationTimestamp);

this.log = log;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public SchedulerNG createInstance(
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
final ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception {

final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
jobGraph.getScheduleMode(),
Expand Down Expand Up @@ -104,6 +105,7 @@ public SchedulerNG createInstance(
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
schedulerComponents.getAllocatorFactory(),
executionDeploymentTracker);
executionDeploymentTracker,
initializationTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public SchedulerBase(
final JobMasterPartitionTracker partitionTracker,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionDeploymentTracker executionDeploymentTracker,
final boolean legacyScheduling) throws Exception {
final boolean legacyScheduling,
long initializationTimestamp) throws Exception {

this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
Expand Down Expand Up @@ -231,7 +232,7 @@ public SchedulerBase(
this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
this.legacyScheduling = legacyScheduling;

this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker));
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);

this.schedulingTopology = executionGraph.getSchedulingTopology();

Expand All @@ -247,9 +248,10 @@ private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception {

ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker);
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);

final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

Expand All @@ -271,7 +273,8 @@ private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker) throws JobExecutionException, JobException {
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws JobExecutionException, JobException {

ExecutionDeploymentListener executionDeploymentListener = new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {
Expand Down Expand Up @@ -303,7 +306,8 @@ private ExecutionGraph createExecutionGraph(
partitionTracker,
failoverStrategy,
executionDeploymentListener,
executionStateUpdateListener);
executionStateUpdateListener,
initializationTimestamp);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ SchedulerNG createInstance(
Time slotRequestTimeout,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker) throws Exception;
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public static Map<String, String> fromKeyValueJsonArray(String jsonString) {
public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
JobStatus status = job.getState();

long started = job.getStatusTimestamp(JobStatus.CREATED);
long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception
timeout,
log,
NettyShuffleMaster.INSTANCE,
NoOpJobMasterPartitionTracker.INSTANCE);
NoOpJobMasterPartitionTracker.INSTANCE,
System.currentTimeMillis());

assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private TestContext createTestContext() {
JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture = new CompletableFuture<>();
DispatcherJob dispatcherJob = DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
jobGraph.getJobID(), jobGraph.getName());
jobGraph.getJobID(), jobGraph.getName(), System.currentTimeMillis());

return new TestContext(
jobManagerRunnerCompletableFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public FailingJobManagerRunnerFactory(FlinkException testException) {
}

@Override
public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
throw testException;
}
}
Expand Down
Loading

0 comments on commit 12967c8

Please sign in to comment.