Skip to content

Commit

Permalink
[FLINK-28585][runtime] Ensure all the concurrent executions of Specul…
Browse files Browse the repository at this point in the history
…ativeExecutionVertex to share the same input splits

This allows speculative execution for sources tasks from jobs with InputFormatSource.

This closes apache#20322.
  • Loading branch information
zhuzhurk committed Jul 25, 2022
1 parent 2b9b985 commit 7611928
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ public boolean tryAssignResource(final LogicalSlot logicalSlot) {
}
}

public InputSplit getNextInputSplit() {
public Optional<InputSplit> getNextInputSplit() {
final LogicalSlot slot = this.getAssignedResource();
final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
return this.vertex.getNextInputSplit(host);
return this.vertex.getNextInputSplit(host, getAttemptNumber());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,14 @@ public ConsumedPartitionGroup getConsumedPartitionGroup(int input) {
return allConsumedPartitions.get(input);
}

public InputSplit getNextInputSplit(String host) {
final int taskId = getParallelSubtaskIndex();
synchronized (inputSplits) {
final InputSplit nextInputSplit =
jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
if (nextInputSplit != null) {
inputSplits.add(nextInputSplit);
}
return nextInputSplit;
public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
final int subtaskIndex = getParallelSubtaskIndex();
final InputSplit nextInputSplit =
jobVertex.getSplitAssigner().getNextInputSplit(host, subtaskIndex);
if (nextInputSplit != null) {
inputSplits.add(nextInputSplit);
}
return Optional.ofNullable(nextInputSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand All @@ -27,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -46,6 +48,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {

private int originalAttemptNumber;

final Map<Integer, Integer> nextInputSplitIndexToConsumeByAttempts;

public SpeculativeExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
Expand All @@ -66,6 +70,7 @@ public SpeculativeExecutionVertex(
this.currentExecutions = new LinkedHashMap<>();
this.currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
this.originalAttemptNumber = currentExecution.getAttemptNumber();
this.nextInputSplitIndexToConsumeByAttempts = new HashMap<>();
}

public boolean containsSources() {
Expand Down Expand Up @@ -155,6 +160,7 @@ public void resetForNewExecution() {
currentExecutions.clear();
currentExecutions.put(currentExecution.getAttemptNumber(), currentExecution);
originalAttemptNumber = currentExecution.getAttemptNumber();
nextInputSplitIndexToConsumeByAttempts.clear();
}

@Override
Expand All @@ -180,6 +186,7 @@ public void archiveFailedExecution(ExecutionAttemptID executionAttemptId) {

final Execution removedExecution =
this.currentExecutions.remove(executionAttemptId.getAttemptNumber());
nextInputSplitIndexToConsumeByAttempts.remove(executionAttemptId.getAttemptNumber());
checkNotNull(
removedExecution,
"Cannot remove execution %s which does not exist.",
Expand Down Expand Up @@ -238,6 +245,23 @@ private int getStatePriority(ExecutionState state) {
}
}

@Override
public Optional<InputSplit> getNextInputSplit(String host, int attemptNumber) {
final int index = nextInputSplitIndexToConsumeByAttempts.getOrDefault(attemptNumber, 0);
checkState(index <= inputSplits.size());

if (index < inputSplits.size()) {
nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
return Optional.of(inputSplits.get(index));
} else {
final Optional<InputSplit> split = super.getNextInputSplit(host, attemptNumber);
if (split.isPresent()) {
nextInputSplitIndexToConsumeByAttempts.put(attemptNumber, index + 1);
}
return split;
}
}

@Override
void notifyPendingDeployment(Execution execution) {
getExecutionGraphAccessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,14 @@ public SerializedInputSplit requestNextInputSplit(
throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
}

final InputSplit nextInputSplit = execution.getNextInputSplit();
final Optional<InputSplit> optionalNextInputSplit = execution.getNextInputSplit();

if (nextInputSplit != null) {
final InputSplit nextInputSplit;
if (optionalNextInputSplit.isPresent()) {
nextInputSplit = optionalNextInputSplit.get();
log.debug("Send next input split {}.", nextInputSplit);
} else {
nextInputSplit = null;
log.debug("No more input splits available");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -33,8 +36,13 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -229,8 +237,47 @@ void testGetExecutionState() throws Exception {
}
}

private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
@Test
void testGetNextInputSplit() throws Exception {
final TestInputSource source = new TestInputSource();
final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
jobVertex.setInputSplitSource(source);

final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(jobVertex);

final int numExecutions = 3;
for (int i = 0; i < numExecutions - 1; ++i) {
ev.createNewSpeculativeExecution(0);
}
final List<Execution> executions = new ArrayList<>(ev.getCurrentExecutions());

final Map<Integer, List<InputSplit>> splitsOfAttempts = new HashMap<>();
final Random rand = new Random();
while (executions.size() > 0) {
final int index = rand.nextInt(executions.size());
final Execution execution = executions.get(index);
final Optional<InputSplit> split = execution.getNextInputSplit();
if (split.isPresent()) {
splitsOfAttempts
.computeIfAbsent(execution.getAttemptNumber(), k -> new ArrayList<>())
.add(split.get());
} else {
executions.remove(index);
}
}

assertThat(splitsOfAttempts).hasSize(numExecutions);
assertThat(splitsOfAttempts.get(0)).containsExactlyInAnyOrder(source.splits);
assertThat(splitsOfAttempts.get(1)).isEqualTo(splitsOfAttempts.get(0));
assertThat(splitsOfAttempts.get(2)).isEqualTo(splitsOfAttempts.get(0));
}

private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
return createSpeculativeExecutionVertex(ExecutionGraphTestUtils.createNoOpVertex(1));
}

private SpeculativeExecutionVertex createSpeculativeExecutionVertex(final JobVertex jobVertex)
throws Exception {
final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
return (SpeculativeExecutionVertex)
Expand All @@ -249,4 +296,27 @@ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exce

return executionGraph;
}

private class TestInputSource extends GenericInputFormat<Integer> {
private GenericInputSplit[] splits;

public GenericInputSplit[] createInputSplits(int numSplitsHint) {
final int numSplits = numSplitsHint * 10;
splits = new GenericInputSplit[numSplits];
for (int i = 0; i < numSplits; ++i) {
splits[i] = new GenericInputSplit(i, numSplits);
}
return splits;
}

@Override
public boolean reachedEnd() throws IOException {
return false;
}

@Override
public Integer nextRecord(Integer reuse) throws IOException {
return null;
}
}
}

0 comments on commit 7611928

Please sign in to comment.