Skip to content

Commit

Permalink
[FLINK-15325][coordination] Ignores the input locations of a ConsumeP…
Browse files Browse the repository at this point in the history
…artitionGroup if the corresponding ConsumerVertexGroup is too large

This closes apache#21743.
  • Loading branch information
zhuzhurk committed Jan 30, 2023
1 parent c0bfb0b commit 76ebeb9
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
Expand Down Expand Up @@ -124,6 +125,15 @@ void enableCheckpointing(

Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults();

/**
* Gets the intermediate result partition by the given partition ID, or throw an exception if
* the partition is not found.
*
* @param id of the intermediate result partition
* @return intermediate result partition
*/
IntermediateResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID id);

/**
* Merges all accumulator results from the tasks previously executed in the Executions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class ExecutionVertex

public static final long NUM_BYTES_UNKNOWN = -1;

public static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;

// --------------------------------------------------------------------------------------------

final ExecutionJobVertex jobVertex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

Expand All @@ -34,9 +35,16 @@ class AvailableInputsLocationsRetriever implements InputsLocationsRetriever {
}

@Override
public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(
ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
return inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
}

@Override
public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(
ConsumedPartitionGroup consumedPartitionGroup) {
return inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(
consumedPartitionGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.concurrent.FutureUtils;
Expand All @@ -30,7 +31,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -39,6 +39,10 @@
*/
public class DefaultPreferredLocationsRetriever implements PreferredLocationsRetriever {

static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;

static final int MAX_DISTINCT_CONSUMERS_TO_CONSIDER = 8;

private final StateLocationRetriever stateLocationRetriever;

private final InputsLocationsRetriever inputsLocationsRetriever;
Expand Down Expand Up @@ -84,11 +88,24 @@ private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations
CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =
CompletableFuture.completedFuture(Collections.emptyList());

final Collection<Collection<ExecutionVertexID>> allProducers =
inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
for (Collection<ExecutionVertexID> producers : allProducers) {
final Collection<ConsumedPartitionGroup> consumedPartitionGroups =
inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {
// Ignore the location of a consumed partition group if it has too many distinct
// consumers compared to the consumed partition group size. This is to avoid tasks
// unevenly distributed on nodes when running batch jobs or running jobs in
// session/standalone mode.
if ((double) consumedPartitionGroup.getConsumerVertexGroup().size()
/ consumedPartitionGroup.size()
> MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {
continue;
}

final Collection<CompletableFuture<TaskManagerLocation>> locationsFutures =
getInputLocationFutures(producersToIgnore, producers);
getInputLocationFutures(
producersToIgnore,
inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(
consumedPartitionGroup));

preferredLocations = combineLocations(preferredLocations, locationsFutures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
Expand Down Expand Up @@ -511,9 +512,16 @@ public Set<CoLocationGroup> getCoLocationGroups() {
}

@Override
public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(
ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
return inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
}

@Override
public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(
ConsumedPartitionGroup consumedPartitionGroup) {
return inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(
consumedPartitionGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.IterableUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -47,26 +45,22 @@ public ExecutionGraphToInputsLocationsRetrieverAdapter(ExecutionGraph executionG
}

@Override
public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(
ExecutionVertexID executionVertexId) {
ExecutionVertex ev = getExecutionVertex(executionVertexId);

InternalExecutionGraphAccessor executionGraphAccessor = ev.getExecutionGraphAccessor();
return getExecutionVertex(executionVertexId).getAllConsumedPartitionGroups();
}

List<Collection<ExecutionVertexID>> resultPartitionProducers =
new ArrayList<>(ev.getNumberOfInputs());
for (ConsumedPartitionGroup consumedPartitions : ev.getAllConsumedPartitionGroups()) {
List<ExecutionVertexID> producers = new ArrayList<>(consumedPartitions.size());
for (IntermediateResultPartitionID consumedPartitionId : consumedPartitions) {
ExecutionVertex producer =
executionGraphAccessor
.getResultPartitionOrThrow(consumedPartitionId)
.getProducer();
producers.add(producer.getID());
}
resultPartitionProducers.add(producers);
}
return resultPartitionProducers;
@Override
public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(
ConsumedPartitionGroup consumedPartitionGroup) {
return IterableUtils.toStream(consumedPartitionGroup)
.map(
partition ->
executionGraph
.getResultPartitionOrThrow(partition)
.getProducer()
.getID())
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,40 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/** Component to retrieve the inputs locations of a {@link Execution}. */
/** Component to retrieve the inputs locations of an {@link ExecutionVertex}. */
public interface InputsLocationsRetriever {

/**
* Get the producers of the result partitions consumed by an execution.
* Get the consumed result partition groups of an execution vertex.
*
* @param executionVertexId identifies the execution
* @return the producers of the result partitions group by job vertex id
* @param executionVertexId identifies the execution vertex
* @return the consumed result partition groups
*/
Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(
ExecutionVertexID executionVertexId);

/**
* Get the task manager location future for an execution.
* Get the producer execution vertices of a consumed result partition group.
*
* @param executionVertexId identifying the execution
* @param consumedPartitionGroup the consumed result partition group
* @return the ids of producer execution vertices
*/
Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(
ConsumedPartitionGroup consumedPartitionGroup);

/**
* Get the task manager location future for an execution vertex.
*
* @param executionVertexId identifying the execution vertex
* @return the task manager location future
*/
Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;

import org.junit.jupiter.api.Test;

import java.util.Collection;
Expand Down Expand Up @@ -68,15 +71,20 @@ void testInputLocationIfDone() {
}

@Test
void testConsumedResultPartitionsProducers() {
void testGetConsumedPartitionGroupAndProducers() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
Collection<Collection<ExecutionVertexID>> producers =
availableInputsLocationsRetriever.getConsumedResultPartitionsProducers(EV2);
assertThat(producers).hasSize(1);
Collection<ExecutionVertexID> resultProducers = producers.iterator().next();
assertThat(resultProducers).containsExactly(EV1);

ConsumedPartitionGroup consumedPartitionGroup =
Iterables.getOnlyElement(
(availableInputsLocationsRetriever.getConsumedPartitionGroups(EV2)));
assertThat(consumedPartitionGroup).hasSize(1);

Collection<ExecutionVertexID> producers =
availableInputsLocationsRetriever.getProducersOfConsumedPartitionGroup(
consumedPartitionGroup);
assertThat(producers).containsExactly(EV1);
}

private static TestingInputsLocationsRetriever getOriginalLocationRetriever() {
Expand Down
Loading

0 comments on commit 76ebeb9

Please sign in to comment.