Skip to content

Commit

Permalink
[FLINK-12763][runtime] Requests slots with ResourceProfiles that are …
Browse files Browse the repository at this point in the history
…converted from ResourceSpecs.
  • Loading branch information
xintongsong authored and StephanEwen committed Jul 10, 2019
1 parent b5f2303 commit d5520a3
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public String toString() {
'}';
}

static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());

return new ResourceProfile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand Down Expand Up @@ -46,6 +47,13 @@ public interface AccessExecutionJobVertex {
*/
int getMaxParallelism();

/**
* Returns the resource profile for this job vertex.
*
* @return resource profile for this job vertex.
*/
ResourceProfile getResourceProfile();

/**
* Returns the {@link JobVertexID} for this job vertex.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand All @@ -38,6 +39,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser

private final int maxParallelism;

private final ResourceProfile resourceProfile;

private final StringifiedAccumulatorResult[] archivedUserAccumulators;

public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
Expand All @@ -52,6 +55,7 @@ public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
this.name = jobVertex.getJobVertex().getName();
this.parallelism = jobVertex.getParallelism();
this.maxParallelism = jobVertex.getMaxParallelism();
this.resourceProfile = jobVertex.getResourceProfile();
}

public ArchivedExecutionJobVertex(
Expand All @@ -60,12 +64,14 @@ public ArchivedExecutionJobVertex(
String name,
int parallelism,
int maxParallelism,
ResourceProfile resourceProfile,
StringifiedAccumulatorResult[] archivedUserAccumulators) {
this.taskVertices = taskVertices;
this.id = id;
this.name = name;
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
this.resourceProfile = resourceProfile;
this.archivedUserAccumulators = archivedUserAccumulators;
}

Expand All @@ -88,6 +94,11 @@ public int getMaxParallelism() {
return maxParallelism;
}

@Override
public ResourceProfile getResourceProfile() {
return resourceProfile;
}

@Override
public JobVertexID getJobVertexId() {
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand Down Expand Up @@ -577,7 +576,7 @@ private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
slotRequestId,
toSchedule,
new SlotProfile(
ResourceProfile.UNKNOWN,
vertex.getResourceProfile(),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable

private int maxParallelism;

private final ResourceProfile resourceProfile;

/**
* Either store a serialized task information, which is for all sub tasks the same,
* or the permanent blob key of the offloaded task information BLOB containing
Expand Down Expand Up @@ -193,6 +196,7 @@ public ExecutionJobVertex(
}

this.parallelism = numTaskVertices;
this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), 0);

this.taskVertices = new ExecutionVertex[numTaskVertices];
this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
Expand Down Expand Up @@ -334,6 +338,11 @@ public int getMaxParallelism() {
return maxParallelism;
}

@Override
public ResourceProfile getResourceProfile() {
return resourceProfile;
}

public boolean isMaxParallelismConfigured() {
return maxParallelismConfigured;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand Down Expand Up @@ -227,6 +228,10 @@ public int getMaxParallelism() {
return this.jobVertex.getMaxParallelism();
}

public ResourceProfile getResourceProfile() {
return this.jobVertex.getResourceProfile();
}

@Override
public int getParallelSubtaskIndex() {
return this.subTaskIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
Expand Down Expand Up @@ -108,6 +109,7 @@ public void testHandleRequest() throws Exception {
"test",
1,
1,
ResourceProfile.UNKNOWN,
emptyAccumulators);

// Change some fields so we can make it different from other sub tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler.legacy.utils;

import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -78,6 +79,7 @@ public ArchivedExecutionJobVertex build() {
name != null ? name : "task_" + RANDOM.nextInt(),
parallelism,
maxParallelism,
ResourceProfile.UNKNOWN,
archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
);
}
Expand Down

0 comments on commit d5520a3

Please sign in to comment.