Skip to content

Commit

Permalink
[FLINK-20852][webui] Provide more detailed per subtask backpressure s…
Browse files Browse the repository at this point in the history
…tats
  • Loading branch information
pnowojski committed Jan 12, 2021
1 parent 512e7f3 commit f43fc04
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 33 deletions.
24 changes: 15 additions & 9 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -984,10 +984,10 @@
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
Expand Down Expand Up @@ -1081,10 +1081,10 @@
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
Expand Down Expand Up @@ -1296,10 +1296,10 @@
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
Expand Down Expand Up @@ -1397,10 +1397,10 @@
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
Expand Down Expand Up @@ -2049,6 +2049,12 @@
},
"ratio" : {
"type" : "number"
},
"idleRatio" : {
"type" : "number"
},
"busyRatio" : {
"type" : "number"
}
}
}
Expand Down Expand Up @@ -3183,4 +3189,4 @@
}
}
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ export interface JobBackpressureSubtaskInterface {
subtask: number;
'backpressure-level': string;
ratio: number;
idleRatio: number;
busyRatio: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@
</tr>
<tr>
<th>SubTask</th>
<th>Ratio</th>
<th>Status</th>
<th>Backpressured / Idle / Busy</th>
<th>Backpressure Status</th>
</tr>
</thead>
<tbody>
<tr *ngFor="let subtask of listOfSubTaskBackpressure;trackBy:trackBackPressureBy;">
<td>{{ subtask['subtask'] }}</td>
<td>{{ subtask['ratio'] }}</td>
<td>{{ this.prettyPrint(subtask['ratio']) }} / {{ this.prettyPrint(subtask['idleRatio']) }} / {{ this.prettyPrint(subtask['busyRatio']) }}</td>
<td>
<flink-backpressure-badge [state]="backpressure['backpressure-level']"></flink-backpressure-badge>
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy
return node.subtask;
}

prettyPrint(value: number): string {
if (isNaN(value)) {
return "N/A"
}
else {
return Math.round(value * 100) + "%";
}
}

ngOnInit() {
this.jobService.jobWithVertex$
.pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,42 @@ private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
int subtaskIndex = entry.getKey();
ComponentMetricStore subtaskMetricStore = entry.getValue();
double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
double idleRatio = getIdleRatio(subtaskMetricStore);
double busyRatio = getBusyRatio(subtaskMetricStore);
result.add(
new SubtaskBackPressureInfo(
subtaskIndex,
getBackPressureLevel(backPressureRatio),
backPressureRatio));
backPressureRatio,
idleRatio,
busyRatio));
}
result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
return result;
}

private double getMaxBackPressureRatio(List<SubtaskBackPressureInfo> subtaskBackPressureInfos) {
return subtaskBackPressureInfos.stream()
.mapToDouble(backPressureInfo -> backPressureInfo.getRatio())
.mapToDouble(backPressureInfo -> backPressureInfo.getBackPressuredRatio())
.max()
.getAsDouble();
}

private double getBackPressureRatio(ComponentMetricStore metricStore) {
return Double.valueOf(metricStore.getMetric(MetricNames.TASK_BACK_PRESSURED_TIME, "0"))
/ 1_000;
return getMsPerSecondMetricAsRatio(metricStore, MetricNames.TASK_BACK_PRESSURED_TIME);
}

private double getIdleRatio(ComponentMetricStore metricStore) {
return getMsPerSecondMetricAsRatio(metricStore, MetricNames.TASK_IDLE_TIME);
}

private double getBusyRatio(ComponentMetricStore metricStore) {
return getMsPerSecondMetricAsRatio(metricStore, MetricNames.TASK_BUSY_TIME);
}

private double getMsPerSecondMetricAsRatio(
ComponentMetricStore metricStore, String metricName) {
return Double.valueOf(metricStore.getMetric(metricName, "0")) / 1_000;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,37 @@ public static final class SubtaskBackPressureInfo {

public static final String FIELD_NAME_SUBTASK = "subtask";
public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
public static final String FIELD_NAME_RATIO = "ratio";
public static final String FIELD_NAME_BACK_PRESSURED_RATIO = "ratio";
public static final String FIELD_NAME_IDLE_RATIO = "idleRatio";
public static final String FIELD_NAME_BUSY_RATIO = "busyRatio";

@JsonProperty(FIELD_NAME_SUBTASK)
private final int subtask;

@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
private final VertexBackPressureLevel backpressureLevel;

@JsonProperty(FIELD_NAME_RATIO)
private final double ratio;
@JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO)
private final double backPressuredRatio;

@JsonProperty(FIELD_NAME_IDLE_RATIO)
private final double idleRatio;

@JsonProperty(FIELD_NAME_BUSY_RATIO)
private final double busyRatio;

public SubtaskBackPressureInfo(
@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
VertexBackPressureLevel backpressureLevel,
@JsonProperty(FIELD_NAME_RATIO) double ratio) {
@JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO) double backPressuredRatio,
@JsonProperty(FIELD_NAME_IDLE_RATIO) double idleRatio,
@JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio) {
this.subtask = subtask;
this.backpressureLevel = checkNotNull(backpressureLevel);
this.ratio = ratio;
this.backPressuredRatio = backPressuredRatio;
this.idleRatio = idleRatio;
this.busyRatio = busyRatio;
}

@Override
Expand All @@ -153,13 +165,16 @@ public boolean equals(Object o) {
}
SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o;
return subtask == that.subtask
&& ratio == that.ratio
&& backPressuredRatio == that.backPressuredRatio
&& idleRatio == that.idleRatio
&& busyRatio == that.busyRatio
&& Objects.equals(backpressureLevel, that.backpressureLevel);
}

@Override
public int hashCode() {
return Objects.hash(subtask, backpressureLevel, ratio);
return Objects.hash(
subtask, backpressureLevel, backPressuredRatio, idleRatio, busyRatio);
}

public int getSubtask() {
Expand All @@ -170,8 +185,16 @@ public VertexBackPressureLevel getBackpressureLevel() {
return backpressureLevel;
}

public double getRatio() {
return ratio;
public double getBackPressuredRatio() {
return backPressuredRatio;
}

public double getIdleRatio() {
return idleRatio;
}

public double getBusyRatio() {
return busyRatio;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.SubtaskBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureStatus;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
Expand Down Expand Up @@ -80,13 +81,17 @@ private static Collection<MetricDump> getMetricDumps() {
TEST_JOB_VERTEX_ID.toString(),
0);
dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
dumps.add(new GaugeDump(task0, MetricNames.TASK_BUSY_TIME, "0"));

TaskQueryScopeInfo task1 =
new TaskQueryScopeInfo(
TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
TEST_JOB_VERTEX_ID.toString(),
1);
dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));

// missing task2

Expand All @@ -96,6 +101,8 @@ private static Collection<MetricDump> getMetricDumps() {
TEST_JOB_VERTEX_ID.toString(),
3);
dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));

return dumps;
}
Expand Down Expand Up @@ -158,21 +165,31 @@ public void testGetBackPressure() throws Exception {

assertThat(
jobVertexBackPressureInfo.getSubtasks().stream()
.map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getRatio)
.map(SubtaskBackPressureInfo::getBackPressuredRatio)
.collect(Collectors.toList()),
contains(1.0, 0.5, 0.1));

assertThat(
jobVertexBackPressureInfo.getSubtasks().stream()
.map(
JobVertexBackPressureInfo.SubtaskBackPressureInfo
::getBackpressureLevel)
.map(SubtaskBackPressureInfo::getIdleRatio)
.collect(Collectors.toList()),
contains(0.0, 0.1, 0.2));

assertThat(
jobVertexBackPressureInfo.getSubtasks().stream()
.map(SubtaskBackPressureInfo::getBusyRatio)
.collect(Collectors.toList()),
contains(0.0, 0.9, 0.7));

assertThat(
jobVertexBackPressureInfo.getSubtasks().stream()
.map(SubtaskBackPressureInfo::getBackpressureLevel)
.collect(Collectors.toList()),
contains(HIGH, LOW, OK));

assertThat(
jobVertexBackPressureInfo.getSubtasks().stream()
.map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask)
.map(SubtaskBackPressureInfo::getSubtask)
.collect(Collectors.toList()),
contains(0, 1, 3));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ protected JobVertexBackPressureInfo getTestResponseInstance() throws Exception {
List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> subtaskList = new ArrayList<>();
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1));
0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1, 0.5, 0.4));
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4));
1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4, 0.3, 0.3));
subtaskList.add(
new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9));
2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9, 0.0, 0.1));
return new JobVertexBackPressureInfo(
JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
Expand Down

0 comments on commit f43fc04

Please sign in to comment.