Skip to content

Commit

Permalink
[FLINK-21303][coordination] Remove LogicalSlot#getPhysicalSlotNumber
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Feb 8, 2021
1 parent 920be99 commit 7ea786a
Show file tree
Hide file tree
Showing 25 changed files with 5 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ public Offloaded(PermanentBlobKey serializedValueKey) {
/** The list of consumed intermediate result partitions. */
private final List<InputGateDeploymentDescriptor> inputGates;

/** Slot number to run the sub task in on the target machine. */
private final int targetSlotNumber;

/** Information to restore the task. This can be null if there is no state to restore. */
@Nullable private final JobManagerTaskRestore taskRestore;

Expand All @@ -142,7 +139,6 @@ public TaskDeploymentDescriptor(
AllocationID allocationId,
int subtaskIndex,
int attemptNumber,
int targetSlotNumber,
@Nullable JobManagerTaskRestore taskRestore,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
Expand All @@ -161,10 +157,6 @@ public TaskDeploymentDescriptor(
Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
this.attemptNumber = attemptNumber;

Preconditions.checkArgument(
0 <= targetSlotNumber, "The target slot number must be positive.");
this.targetSlotNumber = targetSlotNumber;

this.taskRestore = taskRestore;

this.producedPartitions = Preconditions.checkNotNull(resultPartitionDeploymentDescriptors);
Expand Down Expand Up @@ -234,15 +226,6 @@ public int getAttemptNumber() {
return attemptNumber;
}

/**
* Gets the number of the slot into which the task is to be deployed.
*
* @return The number of the target slot.
*/
public int getTargetSlotNumber() {
return targetSlotNumber;
}

public List<ResultPartitionDeploymentDescriptor> getProducedPartitions() {
return producedPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ private TaskDeploymentDescriptorFactory(

public TaskDeploymentDescriptor createDeploymentDescriptor(
AllocationID allocationID,
int targetSlotNumber,
@Nullable JobManagerTaskRestore taskRestore,
Collection<ResultPartitionDeploymentDescriptor> producedPartitions) {
return new TaskDeploymentDescriptor(
Expand All @@ -96,7 +95,6 @@ public TaskDeploymentDescriptor createDeploymentDescriptor(
allocationID,
subtaskIndex,
attemptNumber,
targetSlotNumber,
taskRestore,
new ArrayList<>(producedPartitions),
createInputGateDeploymentDescriptors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ public void deploy() throws JobException {
TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
.createDeploymentDescriptor(
slot.getAllocationId(),
slot.getPhysicalSlotNumber(),
taskRestore,
producedPartitions.values());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ default CompletableFuture<?> releaseSlot() {
*/
CompletableFuture<?> releaseSlot(@Nullable Throwable cause);

/**
* Gets the slot number on the TaskManager. Multiple logical slots can share the same slot
* number.
*
* @return slot number
*/
int getPhysicalSlotNumber();

/**
* Gets the allocation id of this slot. Multiple logical slots can share the same allocation id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
return releaseFuture;
}

@Override
public int getPhysicalSlotNumber() {
return slotContext.getPhysicalSlotNumber();
}

@Override
public AllocationID getAllocationId() {
return slotContext.getAllocationId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ public CompletableFuture<Acknowledge> submitTask(
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ public Task(
int attemptNumber,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
int targetSlotNumber,
MemoryManager memManager,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
Expand All @@ -331,8 +330,6 @@ public Task(

Preconditions.checkArgument(0 <= subtaskIndex, "The subtask index must be positive.");
Preconditions.checkArgument(0 <= attemptNumber, "The attempt number must be positive.");
Preconditions.checkArgument(
0 <= targetSlotNumber, "The target slot number must be positive.");

this.taskInfo =
new TaskInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class TaskDeploymentDescriptorBuilder {
private int attemptNumber;
private List<ResultPartitionDeploymentDescriptor> producedPartitions;
private List<InputGateDeploymentDescriptor> inputGates;
private int targetSlotNumber;

@Nullable private JobManagerTaskRestore taskRestore;

Expand All @@ -74,7 +73,6 @@ private TaskDeploymentDescriptorBuilder(JobID jobId, String invokableClassName)
this.attemptNumber = 0;
this.producedPartitions = Collections.emptyList();
this.inputGates = Collections.emptyList();
this.targetSlotNumber = 0;
this.taskRestore = null;
}

Expand Down Expand Up @@ -127,11 +125,6 @@ public TaskDeploymentDescriptorBuilder setInputGates(
return this;
}

public TaskDeploymentDescriptorBuilder setTargetSlotNumber(int targetSlotNumber) {
this.targetSlotNumber = targetSlotNumber;
return this;
}

public TaskDeploymentDescriptorBuilder setTaskRestore(
@Nullable JobManagerTaskRestore taskRestore) {
this.taskRestore = taskRestore;
Expand All @@ -147,7 +140,6 @@ public TaskDeploymentDescriptor build() {
allocationId,
subtaskIndex,
attemptNumber,
targetSlotNumber,
taskRestore,
producedPartitions,
inputGates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public class TaskDeploymentDescriptorTest extends TestLogger {
new ArrayList<InputGateDeploymentDescriptor>(0);
private static final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
private static final List<URL> requiredClasspaths = new ArrayList<>(0);
private static final int targetSlotNumber = 47;
private static final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
private static final JobManagerTaskRestore taskRestore =
new JobManagerTaskRestore(1L, taskStateHandles);
Expand Down Expand Up @@ -123,7 +122,6 @@ public void testSerialization() throws Exception {
assertEquals(orig.getAllocationId(), copy.getAllocationId());
assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
assertEquals(
orig.getTaskRestore().getRestoreCheckpointId(),
copy.getTaskRestore().getRestoreCheckpointId());
Expand Down Expand Up @@ -165,7 +163,6 @@ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
allocationId,
indexInSubtaskGroup,
attemptNumber,
targetSlotNumber,
taskRestore,
producedResults,
inputGates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ public void testTddProducedPartitionsLazyScheduling() throws Exception {
TaskDeploymentDescriptor tdd =
tddFactory.createDeploymentDescriptor(
new AllocationID(),
0,
null,
Execution.registerProducedPartitions(
vertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public class TestingLogicalSlot implements LogicalSlot {

private final AtomicReference<Payload> payloadReference;

private final int slotNumber;

private final CompletableFuture<?> releaseFuture;

private final boolean automaticallyCompleteReleaseFuture;
Expand All @@ -55,7 +53,6 @@ public class TestingLogicalSlot implements LogicalSlot {
TestingLogicalSlot(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
int slotNumber,
AllocationID allocationId,
SlotRequestId slotRequestId,
boolean automaticallyCompleteReleaseFuture,
Expand All @@ -64,7 +61,6 @@ public class TestingLogicalSlot implements LogicalSlot {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
this.payloadReference = new AtomicReference<>();
this.slotNumber = slotNumber;
this.allocationId = Preconditions.checkNotNull(allocationId);
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.releaseFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -121,11 +117,6 @@ public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
return releaseFuture;
}

@Override
public int getPhysicalSlotNumber() {
return slotNumber;
}

@Override
public AllocationID getAllocationId() {
return allocationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
public class TestingLogicalSlotBuilder {
private TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
private int slotNumber = 0;
private AllocationID allocationId = new AllocationID();
private SlotRequestId slotRequestId = new SlotRequestId();
private SlotOwner slotOwner = new DummySlotOwner();
Expand All @@ -46,11 +45,6 @@ public TestingLogicalSlotBuilder setTaskManagerLocation(
return this;
}

public TestingLogicalSlotBuilder setSlotNumber(int slotNumber) {
this.slotNumber = slotNumber;
return this;
}

public TestingLogicalSlotBuilder setAllocationId(AllocationID allocationId) {
this.allocationId = allocationId;
return this;
Expand All @@ -76,7 +70,6 @@ public TestingLogicalSlot createTestingLogicalSlot() {
return new TestingLogicalSlot(
taskManagerLocation,
taskManagerGateway,
slotNumber,
allocationId,
slotRequestId,
automaticallyCompleteReleaseFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,9 @@ public void testLogicalSlotAllocation() {
AllocationID allocationId = new AllocationID();
LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
int physicalSlotNumber = 3;
slotContextFuture.complete(
new TestingPhysicalSlot(
allocationId,
taskManagerLocation,
physicalSlotNumber,
taskManagerGateway,
RP));
allocationId, taskManagerLocation, 3, taskManagerGateway, RP));

assertThat(sharedSlot.isEmpty(), is(false));
assertThat(released.isDone(), is(false));
Expand All @@ -104,7 +99,6 @@ public void testLogicalSlotAllocation() {
assertThat(logicalSlot.getAllocationId(), is(allocationId));
assertThat(logicalSlot.getTaskManagerLocation(), is(taskManagerLocation));
assertThat(logicalSlot.getTaskManagerGateway(), is(taskManagerGateway));
assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlotNumber));
assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public void testAllocateLogicalSlot() {
assertThat(logicalSlot.getAllocationId(), equalTo(physicalSlot.getAllocationId()));
assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
assertThat(logicalSlot.getPayload(), nullValue());
assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlot.getPhysicalSlotNumber()));
assertThat(
logicalSlot.getTaskManagerLocation(),
equalTo(physicalSlot.getTaskManagerLocation()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
0);
Collections.emptyList());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,7 @@ private void internalTestPartitionRelease(
Collections.singletonList(taskResultPartitionDescriptor),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
0);
Collections.emptyList());

final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,7 @@ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(
producedPartitions,
inputGates,
Collections.emptyList(),
Collections.emptyList(),
0);
Collections.emptyList());
}

static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
Expand All @@ -711,8 +710,7 @@ static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
List<ResultPartitionDeploymentDescriptor> producedPartitions,
List<InputGateDeploymentDescriptor> inputGates,
Collection<PermanentBlobKey> requiredJarFiles,
Collection<URL> requiredClasspaths,
int targetSlotNumber)
Collection<URL> requiredClasspaths)
throws IOException {

JobInformation jobInformation =
Expand Down Expand Up @@ -746,7 +744,6 @@ static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
new AllocationID(),
subtaskIndex,
attemptNumber,
targetSlotNumber,
null,
producedPartitions,
inputGates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ private Task createTask(Class<? extends AbstractInvokable> invokableClass) throw
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
mock(MemoryManager.class),
mock(IOManager.class),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ public Task build() throws Exception {
0,
resultPartitions,
inputGates,
0,
MemoryManagerBuilder.newBuilder().setMemorySize(1024 * 1024).build(),
mock(IOManager.class),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ public static void main(String[] args) throws Exception {
0, // attemptNumber
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0, // targetSlotNumber
memoryManager,
ioManager,
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ private static Task createTask(
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
mock(MemoryManager.class),
mock(IOManager.class),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ private Task createSystemExitTask(final String invokableClassName, StreamOperato
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
MemoryManagerBuilder.newBuilder().setMemorySize(32L * 1024L).build(),
new IOManagerAsync(),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
MemoryManagerBuilder.newBuilder().setMemorySize(32L * 1024L).build(),
new IOManagerAsync(),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ private Task createTask(Class<? extends AbstractInvokable> invokableClass) throw
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
mock(MemoryManager.class),
mock(IOManager.class),
shuffleEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ private static Task createTask(
0,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
0,
mock(MemoryManager.class),
mock(IOManager.class),
shuffleEnvironment,
Expand Down

0 comments on commit 7ea786a

Please sign in to comment.