Skip to content

Commit

Permalink
[FLINK-16180][runtime] Replace the nullable vertexExecution in Schedu…
Browse files Browse the repository at this point in the history
…ledUnit with a non-null executionVertexId
  • Loading branch information
zhuzhurk committed Feb 28, 2020
1 parent 8f6fcd2 commit 9a3eb5b
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

/**
* ScheduledUnit contains the information necessary to allocate a slot for the given
* {@link JobVertexID}.
* ScheduledUnit contains the information necessary to allocate a slot for the given task.
*/
public class ScheduledUnit {

@Nullable
private final Execution vertexExecution;

private final JobVertexID jobVertexId;
private final ExecutionVertexID executionVertexId;

@Nullable
private final SlotSharingGroupId slotSharingGroupId;
Expand All @@ -44,18 +42,16 @@ public class ScheduledUnit {

// --------------------------------------------------------------------------------------------

@VisibleForTesting
public ScheduledUnit(Execution task) {
this(
Preconditions.checkNotNull(task),
task.getVertex().getJobvertexId(),
null,
task,
null);
}

public ScheduledUnit(Execution task, @Nullable SlotSharingGroupId slotSharingGroupId) {
this(
Preconditions.checkNotNull(task),
task.getVertex().getJobvertexId(),
task,
slotSharingGroupId,
null);
}
Expand All @@ -65,31 +61,28 @@ public ScheduledUnit(
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {
this(
Preconditions.checkNotNull(task),
task.getVertex().getJobvertexId(),
Preconditions.checkNotNull(task).getVertex().getID(),
slotSharingGroupId,
coLocationConstraint);
}

@VisibleForTesting
public ScheduledUnit(
JobVertexID jobVertexId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {
this(
null,
jobVertexId,
new ExecutionVertexID(jobVertexId, 0),
slotSharingGroupId,
coLocationConstraint);
}

public ScheduledUnit(
@Nullable Execution task,
JobVertexID jobVertexId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {
ExecutionVertexID executionVertexId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {

this.vertexExecution = task;
this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
this.executionVertexId = Preconditions.checkNotNull(executionVertexId);
this.slotSharingGroupId = slotSharingGroupId;
this.coLocationConstraint = coLocationConstraint;

Expand All @@ -98,12 +91,11 @@ public ScheduledUnit(
// --------------------------------------------------------------------------------------------

public JobVertexID getJobVertexId() {
return jobVertexId;
return executionVertexId.getJobVertexId();
}

@Nullable
public Execution getTaskToExecute() {
return vertexExecution;
public int getSubtaskIndex() {
return executionVertexId.getSubtaskIndex();
}

@Nullable
Expand All @@ -120,7 +112,7 @@ public CoLocationConstraint getCoLocationConstraint() {

@Override
public String toString() {
return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + slotSharingGroupId +
return "{task=" + executionVertexId + ", sharingUnit=" + slotSharingGroupId +
", locationConstraint=" + coLocationConstraint + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private CompletableFuture<LogicalSlot> allocateSlotInternal(
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getTaskToExecute());
log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getJobVertexId());

componentMainThreadExecutor.assertRunningInMainThread();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public List<SlotExecutionVertexAssignment> allocateSlotsFor(
slotProviderStrategy.allocateSlot(
slotRequestId,
new ScheduledUnit(
executionVertexId.getJobVertexId(),
executionVertexId,
slotSharingGroupId,
schedulingRequirements.getCoLocationConstraint()),
SlotProfile.priorAllocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
SlotProfile slotProfile,
Time allocationTimeout) {
JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
final JobVertexID vertexId = task.getJobVertexId();
final int subtask = task.getSubtaskIndex();

CompletableFuture<LogicalSlot>[] forTask = slotFutures.get(vertexId);
if (forTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
public class DummyScheduledUnit extends ScheduledUnit {
public DummyScheduledUnit() {
super(
null,
new JobVertexID(),
null,
null);
Expand Down

0 comments on commit 9a3eb5b

Please sign in to comment.