Skip to content

Commit

Permalink
[FLINK-30233] execution vertex supports mark hybrid result partition …
Browse files Browse the repository at this point in the history
…finished.
  • Loading branch information
reswqa committed Dec 22, 2022
1 parent bf1cb5f commit ca8f5cf
Show file tree
Hide file tree
Showing 26 changed files with 195 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<td>Integer</td>
<td>The size of the IO thread pool to run blocking operations for all spawned JobMasters. This includes recovery and completion of checkpoints. Increase this value if you experience slow checkpoint operations when running many jobs. If no value is specified, then Flink defaults to the number of available CPU cores.</td>
</tr>
<tr>
<td><h5>jobmanager.partition.hybrid.only-consume-finished-partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Controls whether the scheduler only allows downstream task consume finished partition. Note that this option is allowed only when <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>, and if you also enable speculative execution(<code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.speculative.enabled</code> has been set to true),this option can only be set to true.</td>
</tr>
<tr>
<td><h5>jobmanager.resource-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to a negative value to disable the resource timeout.</td>
</tr>
<tr>
<td><h5>jobmanager.partition.hybrid.only-consume-finished-partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Controls whether the scheduler only allows downstream task consume finished partition. Note that this option is allowed only when <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>, and if you also enable speculative execution(<code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.speculative.enabled</code> has been set to true),this option can only be set to true.</td>
</tr>
<tr>
<td><h5>jobmanager.scheduler</h5></td>
<td style="word-wrap: break-word;">Default</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<td>MemorySize</td>
<td>Total Process Memory size for the JobManager. This includes all the memory that a JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this should be set to the container memory. See also 'jobmanager.memory.flink.size' for Total Flink Memory size configuration.</td>
</tr>
<tr>
<td><h5>jobmanager.partition.hybrid.only-consume-finished-partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Controls whether the scheduler only allows downstream task consume finished partition. Note that this option is allowed only when <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>, and if you also enable speculative execution(<code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.speculative.enabled</code> has been set to true),this option can only be set to true.</td>
</tr>
<tr>
<td><h5>jobmanager.resource-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Method <org.apache.flink.runtime.blob.BlobInputStream.read([B, int, int)> calls
Method <org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(java.io.InputStream, java.security.MessageDigest, org.apache.flink.runtime.blob.BlobKey$BlobType)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobOutputStream.java:155)
Method <org.apache.flink.runtime.blob.BlobUtils.checkAndDeleteCorruptedBlobs(java.nio.file.Path, org.slf4j.Logger)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (BlobUtils.java:514)
Method <org.apache.flink.runtime.blob.FileSystemBlobStore.get(java.lang.String, java.io.File, org.apache.flink.runtime.blob.BlobKey)> calls method <org.apache.flink.runtime.blob.BlobKey.getHash()> in (FileSystemBlobStore.java:124)
Method <org.apache.flink.runtime.executiongraph.Execution.finishPartitionsAndUpdateConsumers()> calls method <org.apache.flink.runtime.executiongraph.ExecutionVertex.finishAllBlockingPartitions()> in (Execution.java:978)
Method <org.apache.flink.runtime.executiongraph.Execution.finishPartitionsAndUpdateConsumers()> calls method <org.apache.flink.runtime.executiongraph.ExecutionVertex.finishPartitionsIfNeeded()> in (Execution.java:978)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantDispatcherLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:83)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantJobMasterLeadership(org.apache.flink.api.common.JobID)> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:95)
Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.grantResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.grantLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:109)
Expand All @@ -25,4 +25,4 @@ Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoade
Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String, java.lang.Runnable)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:294)
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.getTransactionCoordinatorId()> calls method <org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.getTransactionCoordinatorId()> in (FlinkKafkaProducer.java:1327)
Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:96)
Method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()> in (MailboxExecutorImpl.java:63)
Method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()> in (MailboxExecutorImpl.java:63)
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,26 @@ public enum SchedulerType {
.withDescription(
"The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.");

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
key("jobmanager.partition.hybrid.only-consume-finished-partition")
.booleanType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Controls whether the scheduler only allows downstream task consume finished partition. "
+ "Note that this option is allowed only when %s has been set to %s, "
+ "and if you also enable speculative execution(%s has been set to true),"
+ "this option can only be set to true.",
code(SCHEDULER.key()),
code(SchedulerType.AdaptiveBatch.name()),
code(SPECULATIVE_ENABLED.key()))
.build());

// ---------------------------------------------------------------------------------------------

private JobManagerOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG

private final List<JobStatusHook> jobStatusHooks;

private final MarkPartitionFinishedStrategy markPartitionFinishedStrategy;

// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
Expand All @@ -318,7 +320,8 @@ public DefaultExecutionGraph(
VertexParallelismStore vertexParallelismStore,
boolean isDynamic,
ExecutionJobVertex.Factory executionJobVertexFactory,
List<JobStatusHook> jobStatusHooks)
List<JobStatusHook> jobStatusHooks,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy)
throws IOException {

this.executionGraphId = new ExecutionGraphID();
Expand Down Expand Up @@ -390,6 +393,8 @@ public DefaultExecutionGraph(

this.jobStatusHooks = checkNotNull(jobStatusHooks);

this.markPartitionFinishedStrategy = markPartitionFinishedStrategy;

LOG.info(
"Created execution graph {} for job {}.",
executionGraphId,
Expand Down Expand Up @@ -1684,4 +1689,9 @@ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
IntermediateDataSetID intermediateDataSetID) {
return partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
}

@Override
public MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
return markPartitionFinishedStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public static DefaultExecutionGraph buildGraph(
VertexParallelismStore vertexParallelismStore,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
boolean isDynamicGraph,
ExecutionJobVertex.Factory executionJobVertexFactory)
ExecutionJobVertex.Factory executionJobVertexFactory,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy)
throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");
Expand Down Expand Up @@ -139,7 +140,8 @@ public static DefaultExecutionGraph buildGraph(
vertexParallelismStore,
isDynamicGraph,
executionJobVertexFactory,
jobGraph.getJobStatusHooks());
jobGraph.getJobStatusHooks(),
markPartitionFinishedStrategy);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met

private void finishPartitionsAndUpdateConsumers() {
final List<IntermediateResultPartition> finishedPartitions =
getVertex().finishAllBlockingPartitions();
getVertex().finishPartitionsIfNeeded();

for (IntermediateResultPartition partition : finishedPartitions) {
updatePartitionConsumers(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,28 +477,34 @@ void cachePartitionInfo(PartitionInfo partitionInfo) {
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
}

/** Returns all blocking result partitions whose receivers can be scheduled/updated. */
/**
* Mark partition finished if needed.
*
* @return list of finished partitions.
*/
@VisibleForTesting
public List<IntermediateResultPartition> finishAllBlockingPartitions() {
List<IntermediateResultPartition> finishedBlockingPartitions = null;

public List<IntermediateResultPartition> finishPartitionsIfNeeded() {
List<IntermediateResultPartition> finishedPartitions = null;
MarkPartitionFinishedStrategy markPartitionFinishedStrategy =
getExecutionGraphAccessor().getMarkPartitionFinishedStrategy();
for (IntermediateResultPartition partition : resultPartitions.values()) {
if (!partition.getResultType().canBePipelinedConsumed()) {
if (markPartitionFinishedStrategy.needMarkPartitionFinished(
partition.getResultType())) {

partition.markFinished();

if (finishedBlockingPartitions == null) {
finishedBlockingPartitions = new LinkedList<>();
if (finishedPartitions == null) {
finishedPartitions = new LinkedList<>();
}

finishedBlockingPartitions.add(partition);
finishedPartitions.add(partition);
}
}

if (finishedBlockingPartitions == null) {
if (finishedPartitions == null) {
return Collections.emptyList();
} else {
return finishedBlockingPartitions;
return finishedPartitions;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ private EdgeManager getEdgeManager() {
}

void markFinished() {
// Sanity check that this is only called on blocking partitions.
if (getResultType().canBePipelinedConsumed()) {
// Sanity check that this is only called on not must be pipelined partitions.
if (getResultType().mustBePipelinedConsumed()) {
throw new IllegalStateException(
"Tried to mark a non-blocking result partition as finished");
"Tried to mark a must-be-pipelined result partition as finished");
}

// Sanity check to make sure a result partition cannot be marked as finished twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ void notifySchedulerNgAboutInternalTaskFailure(
/** Get the shuffle descriptors of the cluster partitions ordered by partition number. */
List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
IntermediateDataSetID intermediateResultPartition);

MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;

/** This strategy is used to decide whether partition needs to be marked as finished. */
public interface MarkPartitionFinishedStrategy {
/**
* Decide whether partition needs to be marked as finished.
*
* @param resultPartitionType type of result partition.
* @return whether this partition needs to be marked as finished.
*/
boolean needMarkPartitionFinished(ResultPartitionType resultPartitionType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -140,6 +141,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
ExecutionStateUpdateListener executionStateUpdateListener,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
Logger log)
throws Exception {
ExecutionDeploymentListener executionDeploymentListener =
Expand Down Expand Up @@ -175,7 +177,8 @@ public ExecutionGraph createAndRestoreExecutionGraph(
vertexParallelismStore,
checkpointStatsTrackerFactory,
isDynamicGraph,
executionJobVertexFactory);
executionJobVertexFactory,
markPartitionFinishedStrategy);

final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.jobgraph.JobGraph;

Expand Down Expand Up @@ -61,6 +62,7 @@ ExecutionGraph createAndRestoreExecutionGraph(
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
ExecutionStateUpdateListener executionStateUpdateListener,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
Logger log)
throws Exception;
}
Loading

0 comments on commit ca8f5cf

Please sign in to comment.