Skip to content

Commit

Permalink
[FLINK-28589][runtime-web] enable expanded rows of other concurrent a…
Browse files Browse the repository at this point in the history
…ttempts for subtasks

This closes #20380.
  • Loading branch information
yangjunhan authored and zhuzhurk committed Aug 2, 2022
1 parent cb2f724 commit f6c5dc1
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ export interface JobBackpressure {
subtasks: JobBackpressureSubtask[];
}

export interface JobBackpressureSubtask {
export interface JobBackpressureSubtaskData {
subtask: number;
'attempt-number'?: number;
'backpressure-level': string;
ratio: number;
idleRatio: number;
busyRatio: number;
}

export interface JobBackpressureSubtask extends JobBackpressureSubtaskData {
'other-concurrent-attempts'?: JobBackpressureSubtaskData[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export interface JobVertexAggregated {
'status-duration': JobVertexStatusDuration<AggregatedStatistics>;
}

export interface JobVertexSubTask {
export interface JobVertexSubTaskData {
attempt: number;
duration: number;
'end-time': number;
Expand All @@ -78,6 +78,10 @@ export interface JobVertexSubTask {
'status-duration': JobVertexStatusDuration<number>;
}

export interface JobVertexSubTask extends JobVertexSubTaskData {
'other-concurrent-attempts'?: JobVertexSubTaskData[];
}

export interface JobVertexSubTaskDetail {
subtasks: JobVertexSubTask[];
aggregated: JobVertexAggregated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,18 @@
</thead>
<tbody>
<ng-template nz-virtual-scroll let-data>
<ng-container *ngIf="narrowLogData(data) as subtask">
<ng-container *ngIf="narrowType(data) as subtask">
<tr>
<td>{{ subtask['subtask'] }}</td>
<td
[nzShowExpand]="subtask['other-concurrent-attempts']?.length > 0"
[nzExpand]="expandSet.has(subtask['subtask'])"
(nzExpandChange)="onExpandChange(subtask, $event)"
>
{{ subtask['subtask'] }}
<span class="left-margin" *ngIf="subtask['attempt-number'] !== undefined">
[attempt-{{ subtask['attempt-number'] + 1 }}]
</span>
</td>
<td>
{{ this.prettyPrint(subtask['ratio']) }} /
{{ this.prettyPrint(subtask['idleRatio']) }} /
Expand All @@ -78,6 +87,34 @@
></flink-dynamic-host>
</td>
</tr>
<ng-container *ngIf="expandSet.has(subtask['subtask'])">
<ng-container
*ngFor="
let attempt of subtask['other-concurrent-attempts'];
trackBy: trackBySubtaskAttempt
"
>
<tr>
<td [nzIndentSize]="1">
{{ attempt['subtask'] }}
<ng-container *ngIf="attempt['attempt-number'] !== undefined">
&nbsp;[attempt-{{ attempt['attempt-number'] + 1 }}]
</ng-container>
</td>
<td>
{{ this.prettyPrint(attempt['ratio']) }} /
{{ this.prettyPrint(attempt['idleRatio']) }} /
{{ this.prettyPrint(attempt['busyRatio']) }}
</td>
<td>
<flink-dynamic-host
[data]="{ state: attempt['backpressure-level'] }"
[component]="stateBadgeComponent"
></flink-dynamic-host>
</td>
</tr>
</ng-container>
</ng-container>
</ng-container>
</ng-template>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,19 @@
@import "theme";

:host {
.left-margin {
margin-left: @margin-xss;
}

::ng-deep {
.ant-table-row-indent {
padding-left: 0 !important;

& + .ant-table-row-expand-icon {
margin-top: 2px;
}
}

.ant-table-cell {
font-size: @font-size-sm;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import { ChangeDetectionStrategy, ChangeDetectorRef, Component, Inject, OnDestro
import { of, Subject } from 'rxjs';
import { catchError, mergeMap, takeUntil, tap } from 'rxjs/operators';

import { JobBackpressure, JobBackpressureSubtask, NodesItemCorrect } from '@flink-runtime-web/interfaces';
import {
JobBackpressure,
JobBackpressureSubtask,
JobBackpressureSubtaskData,
NodesItemCorrect
} from '@flink-runtime-web/interfaces';
import {
JOB_OVERVIEW_MODULE_CONFIG,
JOB_OVERVIEW_MODULE_DEFAULT_CONFIG,
Expand All @@ -34,19 +39,23 @@ import { JobLocalService } from '../../job-local.service';
@Component({
selector: 'flink-job-overview-drawer-backpressure',
templateUrl: './job-overview-drawer-backpressure.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
styleUrls: ['./job-overview-drawer-backpressure.component.less']
styleUrls: ['./job-overview-drawer-backpressure.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush
})
export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy {
public readonly trackBySubtask = (_: number, node: JobBackpressureSubtask): number => node.subtask;
readonly trackBySubtask = (_: number, node: JobBackpressureSubtask): number => node.subtask;
readonly trackBySubtaskAttempt = (_: number, node: JobBackpressureSubtaskData): string =>
`${node.subtask}-${node['attempt-number']}`;

expandSet = new Set<number>();
isLoading = true;
now = Date.now();
selectedVertex: NodesItemCorrect | null;
backpressure = {} as JobBackpressure;
listOfSubTaskBackpressure: JobBackpressureSubtask[] = [];
stateBadgeComponent: Type<unknown>;

public isLoading = true;
public now = Date.now();
public selectedVertex: NodesItemCorrect | null;
public backpressure = {} as JobBackpressure;
public listOfSubTaskBackpressure: JobBackpressureSubtask[] = [];
public stateBadgeComponent: Type<unknown>;
public readonly narrowLogData = typeDefinition<JobBackpressureSubtask>();
readonly narrowType = typeDefinition<JobBackpressureSubtask>();

private readonly destroy$ = new Subject<void>();

Expand All @@ -61,7 +70,7 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy
JOB_OVERVIEW_MODULE_DEFAULT_CONFIG.customComponents.backpressureBadgeComponent;
}

public ngOnInit(): void {
ngOnInit(): void {
this.jobLocalService
.jobWithVertexChanges()
.pipe(
Expand All @@ -87,12 +96,26 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy
});
}

public ngOnDestroy(): void {
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}

public prettyPrint(value: number): string {
collapseAll(): void {
this.expandSet.clear();
this.cdr.markForCheck();
}

onExpandChange(subtask: JobBackpressureSubtask, checked: boolean): void {
if (checked) {
this.expandSet.add(subtask.subtask);
} else {
this.expandSet.delete(subtask.subtask);
}
this.cdr.markForCheck();
}

prettyPrint(value: number): string {
if (isNaN(value)) {
return 'N/A';
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,60 @@
nzSize="small"
[nzLoading]="isLoading"
[nzData]="listOfTask"
[nzScroll]="{ x: '2170px', y: 'calc(100% - 36px)' }"
[nzScroll]="{ x: '2210px', y: 'calc(100% - 36px)' }"
[nzFrontPagination]="false"
[nzShowPagination]="false"
[nzVirtualItemSize]="virtualItemSize"
[nzVirtualMinBufferPx]="300"
[nzVirtualMaxBufferPx]="300"
[nzVirtualItemSize]="36"
[nzVirtualMinBufferPx]="720"
[nzVirtualMaxBufferPx]="720"
[nzVirtualForTrackBy]="trackBySubtask"
>
<thead>
<tr>
<th nzWidth="80px" nzLeft>ID</th>
<th [nzSortFn]="sortReadBytesFn" nzWidth="140px">Bytes Received</th>
<th [nzSortFn]="sortReadRecordsFn" nzWidth="150px">Records Received</th>
<th [nzSortFn]="sortWriteBytesFn" nzWidth="120px">Bytes Sent</th>
<th [nzSortFn]="sortWriteRecordsFn" nzWidth="120px">Records Sent</th>
<th [nzSortFn]="sortAttemptFn" nzWidth="100px">Attempt</th>
<th [nzSortFn]="sortHostFn" nzWidth="200px">Host</th>
<th [nzSortFn]="sortStartTimeFn" nzWidth="150px">Start Time</th>
<th [nzSortFn]="sortDurationFn" nzWidth="150px">Duration</th>
<th [nzSortFn]="sortEndTimeFn" nzWidth="150px">End Time</th>
<th nzWidth="120px" nzLeft>ID</th>
<th [nzSortFn]="sortReadBytesFn" (nzSortOrderChange)="collapseAll()" nzWidth="140px">
Bytes Received
</th>
<th [nzSortFn]="sortReadRecordsFn" (nzSortOrderChange)="collapseAll()" nzWidth="150px">
Records Received
</th>
<th [nzSortFn]="sortWriteBytesFn" (nzSortOrderChange)="collapseAll()" nzWidth="120px">
Bytes Sent
</th>
<th [nzSortFn]="sortWriteRecordsFn" (nzSortOrderChange)="collapseAll()" nzWidth="120px">
Records Sent
</th>
<th [nzSortFn]="sortAttemptFn" (nzSortOrderChange)="collapseAll()" nzWidth="100px">
Attempt
</th>
<th [nzSortFn]="sortHostFn" (nzSortOrderChange)="collapseAll()" nzWidth="200px">Host</th>
<th [nzSortFn]="sortStartTimeFn" (nzSortOrderChange)="collapseAll()" nzWidth="150px">
Start Time
</th>
<th [nzSortFn]="sortDurationFn" (nzSortOrderChange)="collapseAll()" nzWidth="150px">
Duration
</th>
<th [nzSortFn]="sortEndTimeFn" (nzSortOrderChange)="collapseAll()" nzWidth="150px">
End Time
</th>
<th nzWidth="320px">Accumulated Time (Backpressured/Idle/Busy)</th>
<th nzWidth="320px">Status Durations</th>
<th [nzSortFn]="sortStatusFn" nzWidth="120px" nzRight>Status</th>
<th [nzSortFn]="sortStatusFn" (nzSortOrderChange)="collapseAll()" nzWidth="120px" nzRight>
Status
</th>
<th nzWidth="50px" nzRight>More</th>
</tr>
</thead>
<tbody>
<ng-template nz-virtual-scroll let-data>
<ng-container *ngIf="narrowLogData(data) as subtask">
<ng-container *ngIf="narrowType(data) as subtask">
<tr>
<td nzLeft>
<td
nzLeft
[nzShowExpand]="subtask['other-concurrent-attempts']?.length > 0"
[nzExpand]="expandSet.has(subtask['subtask'])"
(nzExpandChange)="onExpandChange(subtask, $event)"
>
{{ subtask['subtask'] }}
</td>
<td>
Expand Down Expand Up @@ -115,6 +138,69 @@
></flink-dynamic-host>
</td>
</tr>
<ng-container *ngIf="expandSet.has(subtask['subtask'])">
<ng-container
*ngFor="
let attempt of subtask['other-concurrent-attempts'];
trackBy: trackBySubtaskAttempt
"
>
<tr>
<td nzLeft [nzIndentSize]="8">{{ subtask['subtask'] }}</td>
<td>
<span *ngIf="attempt['metrics']['read-bytes-complete']; else loadingTemplate">
{{ attempt['metrics']['read-bytes'] | humanizeBytes }}
</span>
</td>
<td>
<span *ngIf="attempt['metrics']['read-records-complete']; else loadingTemplate">
{{ attempt['metrics']['read-records'] | number: '1.0-0' }}
</span>
</td>
<td>
<span *ngIf="attempt['metrics']['write-bytes-complete']; else loadingTemplate">
{{ attempt['metrics']['write-bytes'] | humanizeBytes }}
</span>
</td>
<td>
<span
*ngIf="attempt['metrics']['write-records-complete']; else loadingTemplate"
>
{{ attempt['metrics']['write-records'] | number: '1.0-0' }}
</span>
</td>
<td>{{ attempt.attempt + 1 }}</td>
<td>{{ attempt.host }}</td>
<td>{{ attempt['start_time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td>{{ attempt.duration | humanizeDuration }}</td>
<td>{{ attempt['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td>
{{ attempt['metrics']['accumulated-backpressured-time'] | humanizeDuration }}
/
{{ attempt['metrics']['accumulated-idle-time'] | humanizeDuration }}
/
{{ attempt['metrics']['accumulated-busy-time'] | humanizeDuration }}
</td>
<td>
<ng-container *ngIf="!attempt['status-duration']; else badges">-</ng-container>
<ng-template #badges>
<flink-dynamic-host
*ngFor="let duration of convertStatusDuration(attempt['status-duration'])"
[data]="duration"
[component]="durationBadgeComponent"
></flink-dynamic-host>
</ng-template>
</td>
<td nzRight>
<flink-dynamic-host
[data]="{ state: attempt['status'] }"
[component]="stateBadgeComponent"
></flink-dynamic-host>
</td>
<td nzRight></td>
</tr>
</ng-container>
</ng-container>
</ng-container>
</ng-template>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
}

::ng-deep {
.ant-table-row-indent {
padding-left: 0 !important;

& + .ant-table-row-expand-icon {
margin-top: 2px;
}
}

.ant-table-cell {
font-size: @font-size-sm;
}
Expand Down
Loading

0 comments on commit f6c5dc1

Please sign in to comment.