From c2891c5d467ad4a4dccde733e519e1a7250eb990 Mon Sep 17 00:00:00 2001 From: Kurt Young Date: Mon, 26 Sep 2016 10:59:16 +0800 Subject: [PATCH] [FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper [FLINK-4657] Implement a few rpc calls for JobMaster [FLINK-4657][cluster management] Address review comments [FLINK-4657][cluster management] Throw exception when error occurred when request input split --- .../HighAvailabilityServices.java | 7 +- .../highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 90 +++++++++ .../jobmanager/SubmittedJobGraphStore.java | 2 - .../runtime/jobmaster/JobManagerRunner.java | 18 +- .../flink/runtime/jobmaster/JobMaster.java | 189 +++++++++++++++++- .../runtime/jobmaster/JobMasterGateway.java | 53 ++++- .../jobmaster/message/NextInputSplit.java | 39 ++++ .../resourcemanager/ResourceManager.java | 6 +- .../flink/runtime/util/ZooKeeperUtils.java | 85 ++++++-- .../runtime/taskmanager/TaskManager.scala | 4 +- .../TestingHighAvailabilityServices.java | 20 +- .../jobmaster/JobManagerRunnerMockTest.java | 7 +- .../slotmanager/SlotProtocolTest.java | 2 +- 14 files changed, 462 insertions(+), 64 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index d67e9279b9223..a26886aadbfe6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -32,6 +32,7 @@ *
  • JobManager leader election and leader retrieval
  • *
  • Persistence for checkpoint metadata
  • *
  • Registering the latest completed checkpoint(s)
  • + *
  • Persistence for submitted job graph
  • * */ public interface HighAvailabilityServices { @@ -48,12 +49,10 @@ public interface HighAvailabilityServices { * @return * @throws Exception */ - LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; /** * Gets the leader election service for the cluster's resource manager. - * @return - * @throws Exception */ LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; @@ -62,7 +61,7 @@ public interface HighAvailabilityServices { * * @param jobID The identifier of the job running the election. */ - LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception; + LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception; /** * Gets the checkpoint recovery factory for the job manager diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index a2c9cc4ee22a5..2c6295c52b298 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -79,7 +79,7 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti } @Override - public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0)); } @@ -89,7 +89,7 @@ public LeaderElectionService getResourceManagerLeaderElectionService() throws Ex } @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java new file mode 100644 index 0000000000000..d25965d27a5d5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of the {@link HighAvailabilityServices} with zookeeper. + */ +public class ZookeeperHaServices implements HighAvailabilityServices { + + private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager"; + + /** The ZooKeeper client to use */ + private final CuratorFramework client; + + /** The executor to run ZooKeeper callbacks on */ + private final Executor executor; + + /** The runtime configuration */ + private final Configuration configuration; + + public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) { + this.client = checkNotNull(client); + this.executor = checkNotNull(executor); + this.configuration = checkNotNull(configuration); + } + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + } + + @Override + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID)); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + } + + @Override + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID)); + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor); + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor); + } + + private static String getPathSuffixForJob(final JobID jobID) { + return String.format("/job-managers/%s", jobID); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java index 55c2e7998b84b..6e91f80ebeb54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -88,7 +88,5 @@ interface SubmittedJobGraphListener { * @param jobId The {@link JobID} of the removed job graph */ void onRemovedJobGraph(JobID jobId); - } - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 6944d85d960d1..a096932af6fa8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -21,20 +21,18 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.UUID; -import java.util.concurrent.Executor; /** * The runner for the job manager. It deals with job level leader election and make underlying job manager @@ -52,11 +50,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { private final OnCompletionActions toNotify; - /** The execution context which is used to execute futures */ - private final Executor executionContext; - - // TODO: use this to decide whether the job is finished by other - private final CheckpointRecoveryFactory checkpointRecoveryFactory; + /** Used to check whether a job needs to be run */ + private final SubmittedJobGraphStore submittedJobGraphStore; /** Leader election for this job */ private final LeaderElectionService leaderElectionService; @@ -87,9 +82,8 @@ public JobManagerRunner( { this.jobGraph = jobGraph; this.toNotify = toNotify; - this.executionContext = rpcService.getExecutor(); - this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory(); - this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID()); + this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore(); + this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); this.jobManager = new JobMaster( jobGraph, configuration, rpcService, haServices, @@ -271,7 +265,7 @@ public void handleError(Exception exception) { @VisibleForTesting boolean isJobFinishedByOthers() { - // TODO + // TODO: Fix return false; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 99e2ef8c62cc3..21dd1bd826659 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -35,20 +37,32 @@ import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -61,13 +75,16 @@ import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.slf4j.Logger; +import org.apache.flink.runtime.util.SerializedThrowable; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.duration.FiniteDuration; +import org.slf4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; @@ -493,9 +510,12 @@ public void suspendJob(final Throwable cause) { * @return Acknowledge the task execution state update */ @RpcMethod - public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) { - System.out.println("TaskExecutionState: " + taskExecutionState); - return Acknowledge.get(); + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + if (taskExecutionState == null) { + return false; + } else { + return executionGraph.updateState(taskExecutionState); + } } //----------------------------------------------------------------------------------------------
 @@ -513,6 +533,163 @@ public void run() { }); } + @RpcMethod + public NextInputSplit requestNextInputSplit( + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception + { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + // can happen when JobManager had already unregistered this execution upon on task failure, + // but TaskManager get some delay to aware of that situation + if (log.isDebugEnabled()) { + log.debug("Can not find Execution for attempt {}.", executionAttempt); + } + // but we should TaskManager be aware of this + throw new Exception("Can not find Execution for attempt " + executionAttempt); + } + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex == null) { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + throw new Exception("Cannot find execution vertex for vertex ID " + vertexID); + } + + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner == null) { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + throw new Exception("No InputSplitAssigner for vertex ID " + vertexID); + } + + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + if (log.isDebugEnabled()) { + log.debug("Send next input split {}.", nextInputSplit); + } + + try { + final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + return new NextInputSplit(serializedInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + IOException reason = new IOException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex); + vertex.fail(reason); + throw reason; + } + } + + @RpcMethod + public ExecutionState requestPartitionState( + final ResultPartitionID resultPartitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID intermediateDataSetId) throws PartitionProducerDisposedException { + + final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); + if (execution != null) { + return execution.getState(); + } + else { + final IntermediateResult intermediateResult = + executionGraph.getAllIntermediateResults().get(intermediateDataSetId); + + if (intermediateResult != null) { + // Try to find the producing execution + Execution producerExecution = intermediateResult + .getPartitionById(resultPartitionId.getPartitionId()) + .getProducer() + .getCurrentExecutionAttempt(); + + if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) { + return producerExecution.getState(); + } else { + throw new PartitionProducerDisposedException(resultPartitionId); + } + } else { + throw new IllegalArgumentException("Intermediate data set with ID " + + intermediateDataSetId + " not found."); + } + } + } + + @RpcMethod + public Acknowledge scheduleOrUpdateConsumers(ResultPartitionID partitionID) throws ExecutionGraphException { + executionGraph.scheduleOrUpdateConsumers(partitionID); + return Acknowledge.get(); + } + + //---------------------------------------------------------------------------------------------- + // Internal methods + //---------------------------------------------------------------------------------------------- + + // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread + private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { + jobInfo.setLastActive() + val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + }(context.dispatcher) + } else { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + */ + + if (newJobStatus == JobStatus.FINISHED) { + try { + final Map> accumulatorResults = + executionGraph.getAccumulatorsSerialized(); + final SerializedJobExecutionResult result = new SerializedJobExecutionResult( + jobID, 0, accumulatorResults // TODO get correct job duration + ); + jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); + } catch (Exception e) { + log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); + final JobExecutionException exception = new JobExecutionException( + jobID, "Failed to retrieve accumulator results.", e); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + } + else if (newJobStatus == JobStatus.CANCELED) { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); + final JobExecutionException exception = new JobExecutionException( + jobID, "Job was cancelled.", unpackedError); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + else if (newJobStatus == JobStatus.FAILED) { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); + final JobExecutionException exception = new JobExecutionException( + jobID, "Job execution failed.", unpackedError); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + else { + final JobExecutionException exception = new JobExecutionException( + jobID, newJobStatus + " is not a terminal state."); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + throw new RuntimeException(exception); + } + } + } + private void notifyOfNewResourceManagerLeader( final String resourceManagerAddress, final UUID resourceManagerLeaderId) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6587ccb43bdfe..3b6fd73f2a898 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -19,6 +19,15 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -47,7 +56,47 @@ public interface JobMasterGateway extends RpcGateway { * Updates the task execution state for a given task. * * @param taskExecutionState New task execution state for a given task - * @return Future acknowledge of the task execution state update + * @return Future flag of the task execution state update result */ - Future updateTaskExecutionState(TaskExecutionState taskExecutionState); + Future updateTaskExecutionState(TaskExecutionState taskExecutionState); + + /** + * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender + * as a {@link NextInputSplit} message. + * + * @param vertexID The job vertex id + * @param executionAttempt The execution attempt id + * @return The future of the input split. If there is no further input split, will return an empty object. + * @throws Exception if some error occurred or information mismatch. + */ + Future requestNextInputSplit( + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception; + + /** + * Requests the current state of the partition. + * The state of a partition is currently bound to the state of the producing execution. + * + * @param partitionId The partition ID of the partition to request the state of. + * @param taskExecutionId The execution attempt ID of the task requesting the partition state. + * @param taskResultId The input gate ID of the task requesting the partition state. + * @return The future of the partition state + */ + Future requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId); + + /** + * Notifies the JobManager about available data for a produced partition. + *

    + * There is a call to this method for each {@link ExecutionVertex} instance once per produced + * {@link ResultPartition} instance, either when first producing data (for pipelined executions) + * or when all data has been produced (for staged executions). + *

    + * The JobManager then can decide when to schedule the partition consumers of the given session. + * + * @param partitionID The partition which has already produced data + */ + Future scheduleOrUpdateConsumers(final ResultPartitionID partitionID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java new file mode 100644 index 0000000000000..fe511eda2edcc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.message; + +import java.io.Serializable; + +/** + * Contains the next input split for a task. + */ +public class NextInputSplit implements Serializable { + + private static final long serialVersionUID = -1355784074565856240L; + + private final byte[] splitData; + + public NextInputSplit(final byte[] splitData) { + this.splitData = splitData; + } + + public byte[] getSplitData() { + return splitData; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index f695de41c2d70..f45afa38d8020 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -129,7 +129,7 @@ public void shutDown() { try { leaderElectionService.stop(); for (JobID jobID : jobMasterGateways.keySet()) { - highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop(); } super.shutDown(); } catch (Throwable e) { @@ -179,7 +179,7 @@ public JobMasterGateway call() throws Exception { final LeaderConnectionInfo jobMasterLeaderInfo; try { jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( - highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); throw new Exception("Failed to retrieve JobMasterLeaderRetriever"); @@ -203,7 +203,7 @@ public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable t if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) { JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); try { - LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); + LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID); jobMasterLeaderRetriever.start(jobMasterLeaderListener); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 70ac6c85434ee..81609c24ce892 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -161,13 +161,45 @@ public static String getZooKeeperEnsemble(Configuration flinkConf) * @return {@link ZooKeeperLeaderRetrievalService} instance. */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( - Configuration configuration) { - CuratorFramework client = startCuratorFramework(configuration); + Configuration configuration) throws Exception { + final CuratorFramework client = startCuratorFramework(configuration); + return createLeaderRetrievalService(client, configuration); + } + + /** + * Creates a {@link ZooKeeperLeaderRetrievalService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @return {@link ZooKeeperLeaderRetrievalService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( + final CuratorFramework client, + final Configuration configuration) throws Exception + { + return createLeaderRetrievalService(client, configuration, ""); + } + + /** + * Creates a {@link ZooKeeperLeaderRetrievalService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @param pathSuffix The path suffix which we want to append + * @return {@link ZooKeeperLeaderRetrievalService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( + final CuratorFramework client, + final Configuration configuration, + final String pathSuffix) throws Exception + { String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH); + configuration, + ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, + ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderRetrievalService(client, leaderPath); } @@ -180,7 +212,7 @@ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( * @return {@link ZooKeeperLeaderElectionService} instance. */ public static ZooKeeperLeaderElectionService createLeaderElectionService( - Configuration configuration) { + Configuration configuration) throws Exception { CuratorFramework client = startCuratorFramework(configuration); @@ -196,18 +228,35 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService( */ public static ZooKeeperLeaderElectionService createLeaderElectionService( CuratorFramework client, - Configuration configuration) { + Configuration configuration) throws Exception { - String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH, - ConfigConstants.ZOOKEEPER_LATCH_PATH); - String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH); + return createLeaderElectionService(client, configuration, ""); + } + + /** + * Creates a {@link ZooKeeperLeaderElectionService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @param pathSuffix The path suffix which we want to append + * @return {@link ZooKeeperLeaderElectionService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderElectionService createLeaderElectionService( + final CuratorFramework client, + final Configuration configuration, + final String pathSuffix) throws Exception + { + final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( + configuration, + ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH, + ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix; + final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( + configuration, + ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, + ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index e453ec2cc27cf..c21792c14cd6c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -925,12 +925,12 @@ class TaskManager( val partitionStateChecker = new ActorGatewayPartitionProducerStateChecker( jobManagerGateway, - new FiniteDuration(config.getTimeout.getSize, config.getTimeout.getUnit)) + new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS)) val resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier( context.dispatcher, jobManagerGateway, - new FiniteDuration(config.getTimeout.getSize, config.getTimeout.getUnit)) + new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS)) connectionUtils = Some( (checkpointResponder, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 1a5450dd80093..faf69cc7e9118 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -36,7 +36,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private ConcurrentHashMap jobMasterLeaderRetrievers = new ConcurrentHashMap<>(); - private volatile LeaderElectionService jobMasterLeaderElectionService; + private ConcurrentHashMap jobManagerLeaderElectionServices = new ConcurrentHashMap<>(); private volatile LeaderElectionService resourceManagerLeaderElectionService; @@ -56,8 +56,8 @@ public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobM this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever); } - public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { - this.jobMasterLeaderElectionService = leaderElectionService; + public void setJobMasterLeaderElectionService(JobID jobID, LeaderElectionService leaderElectionService) { + this.jobManagerLeaderElectionServices.put(jobID, leaderElectionService); } public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) { @@ -87,7 +87,7 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti } @Override - public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; @@ -97,24 +97,24 @@ public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Ex } @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { - LeaderElectionService service = jobMasterLeaderElectionService; + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + LeaderElectionService service = resourceManagerLeaderElectionService; if (service != null) { return service; } else { - throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); + throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); } } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - LeaderElectionService service = resourceManagerLeaderElectionService; + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID); if (service != null) { return service; } else { - throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); + throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index bfe5f55f153fa..89807011ec7ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; import org.junit.After; @@ -72,8 +73,11 @@ public void setUp() throws Exception { leaderElectionService = mock(LeaderElectionService.class); when(leaderElectionService.hasLeadership()).thenReturn(true); + SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class); + HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); - when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService); + when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService); + when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore); runner = PowerMockito.spy(new JobManagerRunner( new JobGraph("test"), @@ -127,7 +131,6 @@ public void testShutdownBeforeGrantLeadership() throws Exception { public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception { runner.start(); - when(runner.isJobFinishedByOthers()).thenReturn(true); runner.grantLeadership(UUID.randomUUID()); // runner should shutdown automatic and informed the job completion diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index e3018c9265f90..805ea714360fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -237,7 +237,7 @@ private static TestingLeaderElectionService configureHA( testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService(); - testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService); + testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService); final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID); testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);