Skip to content

Commit

Permalink
[FLINK-12607][rest] Expose maxParallelism of jobs and vertices
Browse files Browse the repository at this point in the history
  • Loading branch information
bytesandwich committed Mar 1, 2021
1 parent 2edd0f6 commit f5e537e
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 5 deletions.
9 changes: 9 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,9 @@
"jid" : {
"type" : "any"
},
"maxParallelism" : {
"type" : "integer"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -1511,6 +1514,9 @@
"id" : {
"type" : "any"
},
"maxParallelism" : {
"type" : "integer"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
Expand Down Expand Up @@ -3429,6 +3435,9 @@
"id" : {
"type" : "any"
},
"maxParallelism" : {
"type" : "integer"
},
"name" : {
"type" : "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
@Internal
public class ArchivedExecutionConfig implements Serializable {

private static final long serialVersionUID = 2126156250920316528L;
private static final long serialVersionUID = 2907040336948181163L;

private final String executionMode;
private final String restartStrategyDescription;
private final int parallelism;
private final int maxParallelism;
private final boolean objectReuseEnabled;
private final Map<String, String> globalJobParameters;

Expand All @@ -47,6 +48,7 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
} else {
restartStrategyDescription = "default";
}
maxParallelism = ec.getMaxParallelism();
parallelism = ec.getParallelism();
objectReuseEnabled = ec.isObjectReuseEnabled();
if (ec.getGlobalJobParameters() != null && ec.getGlobalJobParameters().toMap() != null) {
Expand All @@ -59,11 +61,13 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
public ArchivedExecutionConfig(
String executionMode,
String restartStrategyDescription,
int maxParallelism,
int parallelism,
boolean objectReuseEnabled,
Map<String, String> globalJobParameters) {
this.executionMode = executionMode;
this.restartStrategyDescription = restartStrategyDescription;
this.maxParallelism = maxParallelism;
this.parallelism = parallelism;
this.objectReuseEnabled = objectReuseEnabled;
this.globalJobParameters = globalJobParameters;
Expand All @@ -77,6 +81,10 @@ public String getRestartStrategyDescription() {
return restartStrategyDescription;
}

public int getMaxParallelism() {
return maxParallelism;
}

public int getParallelism() {
return parallelism;
}
Expand Down
9 changes: 9 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,9 @@
"duration" : {
"type" : "integer"
},
"maxParallelism" : {
"type" : "integer"
},
"now" : {
"type" : "integer"
},
Expand All @@ -729,6 +732,9 @@
"name" : {
"type" : "string"
},
"maxParallelism" : {
"type" : "integer"
},
"parallelism" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1887,6 +1893,9 @@
"parallelism" : {
"type" : "integer"
},
"maxParallelism" : {
"type" : "integer"
},
"now" : {
"type" : "integer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface JobDetailInterface {
'start-time': number;
'end-time': number;
duration: number;
maxParallelism: number;
now: number;
timestamps: TimestampsStatus;
vertices: VerticesItemInterface[];
Expand Down Expand Up @@ -80,6 +81,7 @@ export interface VerticesItemInterface {
id: string;
name: string;
parallelism: number;
maxParallelism: number;
status: string;
'start-time': number;
'end-time': number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private static JobDetailsInfo createJobDetailsInfo(
startTime,
endTime,
duration,
executionGraph.getArchivedExecutionConfig().getMaxParallelism(),
now,
timestamps,
jobVertexInfos,
Expand Down Expand Up @@ -223,6 +224,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getName(),
ejv.getMaxParallelism(),
ejv.getParallelism(),
jobVertexState,
startTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
jobVertex.getJobVertexId(),
jobVertex.getName(),
jobVertex.getParallelism(),
jobVertex.getMaxParallelism(),
now,
subtasks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class JobVertexDetailsInfo implements ResponseBody {
public static final String FIELD_NAME_VERTEX_ID = "id";
public static final String FIELD_NAME_VERTEX_NAME = "name";
public static final String FIELD_NAME_PARALLELISM = "parallelism";
public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";
public static final String FIELD_NAME_NOW = "now";
public static final String FIELD_NAME_SUBTASKS = "subtasks";

Expand All @@ -54,6 +55,9 @@ public class JobVertexDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_PARALLELISM)
private final int parallelism;

@JsonProperty(FIELD_NAME_MAX_PARALLELISM)
private final int maxParallelism;

@JsonProperty(FIELD_NAME_NOW)
private final long now;

Expand All @@ -67,11 +71,13 @@ public JobVertexDetailsInfo(
JobVertexID id,
@JsonProperty(FIELD_NAME_VERTEX_NAME) String name,
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
@JsonProperty(FIELD_NAME_NOW) long now,
@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskExecutionAttemptDetailsInfo> subtasks) {
this.id = checkNotNull(id);
this.name = checkNotNull(name);
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
this.now = now;
this.subtasks = checkNotNull(subtasks);
}
Expand All @@ -95,12 +101,13 @@ public boolean equals(Object o) {
return Objects.equals(id, that.id)
&& Objects.equals(name, that.name)
&& parallelism == that.parallelism
&& maxParallelism == that.maxParallelism
&& now == that.now
&& Objects.equals(subtasks, that.subtasks);
}

@Override
public int hashCode() {
return Objects.hash(id, name, parallelism, now, subtasks);
return Objects.hash(id, name, parallelism, maxParallelism, now, subtasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class JobDetailsInfo implements ResponseBody {

public static final String FIELD_NAME_DURATION = "duration";

public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";

// TODO: For what do we need this???
public static final String FIELD_NAME_NOW = "now";

Expand Down Expand Up @@ -92,6 +94,9 @@ public class JobDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_DURATION)
private final long duration;

@JsonProperty(FIELD_NAME_MAX_PARALLELISM)
private final long maxParallelism;

@JsonProperty(FIELD_NAME_NOW)
private final long now;

Expand All @@ -118,6 +123,7 @@ public JobDetailsInfo(
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) long maxParallelism,
@JsonProperty(FIELD_NAME_NOW) long now,
@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<JobStatus, Long> timestamps,
@JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS)
Expand All @@ -133,6 +139,7 @@ public JobDetailsInfo(
this.startTime = startTime;
this.endTime = endTime;
this.duration = duration;
this.maxParallelism = maxParallelism;
this.now = now;
this.timestamps = Preconditions.checkNotNull(timestamps);
this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos);
Expand All @@ -153,6 +160,7 @@ public boolean equals(Object o) {
&& startTime == that.startTime
&& endTime == that.endTime
&& duration == that.duration
&& maxParallelism == that.maxParallelism
&& now == that.now
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(name, that.name)
Expand All @@ -173,6 +181,7 @@ public int hashCode() {
startTime,
endTime,
duration,
maxParallelism,
now,
timestamps,
jobVertexInfos,
Expand Down Expand Up @@ -210,6 +219,11 @@ public long getEndTime() {
return endTime;
}

@JsonIgnore
public long getMaxParallelism() {
return maxParallelism;
}

@JsonIgnore
public long getDuration() {
return duration;
Expand Down Expand Up @@ -251,6 +265,8 @@ public static final class JobVertexDetailsInfo {

public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";

public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";

public static final String FIELD_NAME_PARALLELISM = "parallelism";

public static final String FIELD_NAME_JOB_VERTEX_STATE = "status";
Expand All @@ -272,6 +288,9 @@ public static final class JobVertexDetailsInfo {
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
private final String name;

@JsonProperty(FIELD_NAME_MAX_PARALLELISM)
private final int maxParallelism;

@JsonProperty(FIELD_NAME_PARALLELISM)
private final int parallelism;

Expand Down Expand Up @@ -299,6 +318,7 @@ public JobVertexDetailsInfo(
@JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
JobVertexID jobVertexID,
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
@JsonProperty(FIELD_NAME_JOB_VERTEX_STATE) ExecutionState executionState,
@JsonProperty(FIELD_NAME_JOB_VERTEX_START_TIME) long startTime,
Expand All @@ -309,6 +329,7 @@ public JobVertexDetailsInfo(
@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.name = Preconditions.checkNotNull(name);
this.maxParallelism = maxParallelism;
this.parallelism = parallelism;
this.executionState = Preconditions.checkNotNull(executionState);
this.startTime = startTime;
Expand All @@ -328,6 +349,11 @@ public String getName() {
return name;
}

@JsonIgnore
public int getMaxParallelism() {
return maxParallelism;
}

@JsonIgnore
public int getParallelism() {
return parallelism;
Expand Down Expand Up @@ -372,7 +398,8 @@ public boolean equals(Object o) {
return false;
}
JobVertexDetailsInfo that = (JobVertexDetailsInfo) o;
return parallelism == that.parallelism
return maxParallelism == that.maxParallelism
&& parallelism == that.parallelism
&& startTime == that.startTime
&& endTime == that.endTime
&& duration == that.duration
Expand All @@ -388,6 +415,7 @@ public int hashCode() {
return Objects.hash(
jobVertexID,
name,
maxParallelism,
parallelism,
executionState,
startTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class ArchivedExecutionConfigBuilder {
private String executionMode;
private String restartStrategyDescription;
private int maxParallelism;
private int parallelism;
private boolean objectReuseEnabled;
private Map<String, String> globalJobParameters;
Expand All @@ -48,6 +49,11 @@ public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
return this;
}

public ArchivedExecutionConfigBuilder setMaxParallelism(int parallelism) {
this.maxParallelism = maxParallelism;
return this;
}

public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) {
this.objectReuseEnabled = objectReuseEnabled;
return this;
Expand All @@ -63,6 +69,7 @@ public ArchivedExecutionConfig build() {
return new ArchivedExecutionConfig(
executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(),
restartStrategyDescription != null ? restartStrategyDescription : "default",
maxParallelism,
parallelism,
objectReuseEnabled,
globalJobParameters != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ protected JobVertexDetailsInfo getTestResponseInstance() throws Exception {
jobVertexMetrics,
"taskmanagerId3"));

int parallelism = 1 + (random.nextInt() / 3);
return new JobVertexDetailsInfo(
new JobVertexID(),
"jobVertex" + random.nextLong(),
random.nextInt(),
parallelism,
2 * parallelism,
System.currentTimeMillis(),
vertexTaskDetailList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected JobDetailsInfo getTestResponseInstance() throws Exception {
1L,
2L,
1L,
8888L,
1984L,
timestamps,
jobVertexInfos,
Expand All @@ -96,10 +97,12 @@ private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random ra
tasksPerState.put(executionState, random.nextInt());
}

int parallelism = 1 + (random.nextInt() / 3);
return new JobDetailsInfo.JobVertexDetailsInfo(
new JobVertexID(),
"jobVertex" + random.nextLong(),
random.nextInt(),
2 * parallelism,
parallelism,
ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
random.nextLong(),
random.nextLong(),
Expand Down

0 comments on commit f5e537e

Please sign in to comment.