From f6c5dc1b32ad6e6b524e549eff2b7d9d2b7d9970 Mon Sep 17 00:00:00 2001 From: yangjunhan Date: Wed, 27 Jul 2022 17:06:28 +0800 Subject: [PATCH] [FLINK-28589][runtime-web] enable expanded rows of other concurrent attempts for subtasks This closes #20380. --- .../src/app/interfaces/job-backpressure.ts | 7 +- .../src/app/interfaces/job-vertex.ts | 6 +- ...verview-drawer-backpressure.component.html | 41 +++++- ...verview-drawer-backpressure.component.less | 12 ++ ...-overview-drawer-backpressure.component.ts | 51 ++++++-- ...ob-overview-drawer-subtasks.component.html | 120 +++++++++++++++--- ...ob-overview-drawer-subtasks.component.less | 8 ++ .../job-overview-drawer-subtasks.component.ts | 23 +++- 8 files changed, 229 insertions(+), 39 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-backpressure.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-backpressure.ts index b84eb009b8a4c..0806e65746357 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-backpressure.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-backpressure.ts @@ -23,10 +23,15 @@ export interface JobBackpressure { subtasks: JobBackpressureSubtask[]; } -export interface JobBackpressureSubtask { +export interface JobBackpressureSubtaskData { subtask: number; + 'attempt-number'?: number; 'backpressure-level': string; ratio: number; idleRatio: number; busyRatio: number; } + +export interface JobBackpressureSubtask extends JobBackpressureSubtaskData { + 'other-concurrent-attempts'?: JobBackpressureSubtaskData[]; +} diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-vertex.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-vertex.ts index c82a634164874..746b9d79d6796 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-vertex.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-vertex.ts @@ -65,7 +65,7 @@ export interface JobVertexAggregated { 'status-duration': JobVertexStatusDuration; } -export interface JobVertexSubTask { +export interface JobVertexSubTaskData { attempt: number; duration: number; 'end-time': number; @@ -78,6 +78,10 @@ export interface JobVertexSubTask { 'status-duration': JobVertexStatusDuration; } +export interface JobVertexSubTask extends JobVertexSubTaskData { + 'other-concurrent-attempts'?: JobVertexSubTaskData[]; +} + export interface JobVertexSubTaskDetail { subtasks: JobVertexSubTask[]; aggregated: JobVertexAggregated; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.html index ada1d09210617..963e553927ea6 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.html @@ -63,9 +63,18 @@ - + - {{ subtask['subtask'] }} + + {{ subtask['subtask'] }} + + [attempt-{{ subtask['attempt-number'] + 1 }}] + + {{ this.prettyPrint(subtask['ratio']) }} / {{ this.prettyPrint(subtask['idleRatio']) }} / @@ -78,6 +87,34 @@ > + + + + + {{ attempt['subtask'] }} + +  [attempt-{{ attempt['attempt-number'] + 1 }}] + + + + {{ this.prettyPrint(attempt['ratio']) }} / + {{ this.prettyPrint(attempt['idleRatio']) }} / + {{ this.prettyPrint(attempt['busyRatio']) }} + + + + + + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.less index 5aeeb9da463d2..f88834094c6f6 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.less +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.less @@ -19,7 +19,19 @@ @import "theme"; :host { + .left-margin { + margin-left: @margin-xss; + } + ::ng-deep { + .ant-table-row-indent { + padding-left: 0 !important; + + & + .ant-table-row-expand-icon { + margin-top: 2px; + } + } + .ant-table-cell { font-size: @font-size-sm; } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.ts index c1ad5d226fe16..d97adf331868b 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/backpressure/job-overview-drawer-backpressure.component.ts @@ -20,7 +20,12 @@ import { ChangeDetectionStrategy, ChangeDetectorRef, Component, Inject, OnDestro import { of, Subject } from 'rxjs'; import { catchError, mergeMap, takeUntil, tap } from 'rxjs/operators'; -import { JobBackpressure, JobBackpressureSubtask, NodesItemCorrect } from '@flink-runtime-web/interfaces'; +import { + JobBackpressure, + JobBackpressureSubtask, + JobBackpressureSubtaskData, + NodesItemCorrect +} from '@flink-runtime-web/interfaces'; import { JOB_OVERVIEW_MODULE_CONFIG, JOB_OVERVIEW_MODULE_DEFAULT_CONFIG, @@ -34,19 +39,23 @@ import { JobLocalService } from '../../job-local.service'; @Component({ selector: 'flink-job-overview-drawer-backpressure', templateUrl: './job-overview-drawer-backpressure.component.html', - changeDetection: ChangeDetectionStrategy.OnPush, - styleUrls: ['./job-overview-drawer-backpressure.component.less'] + styleUrls: ['./job-overview-drawer-backpressure.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush }) export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy { - public readonly trackBySubtask = (_: number, node: JobBackpressureSubtask): number => node.subtask; + readonly trackBySubtask = (_: number, node: JobBackpressureSubtask): number => node.subtask; + readonly trackBySubtaskAttempt = (_: number, node: JobBackpressureSubtaskData): string => + `${node.subtask}-${node['attempt-number']}`; + + expandSet = new Set(); + isLoading = true; + now = Date.now(); + selectedVertex: NodesItemCorrect | null; + backpressure = {} as JobBackpressure; + listOfSubTaskBackpressure: JobBackpressureSubtask[] = []; + stateBadgeComponent: Type; - public isLoading = true; - public now = Date.now(); - public selectedVertex: NodesItemCorrect | null; - public backpressure = {} as JobBackpressure; - public listOfSubTaskBackpressure: JobBackpressureSubtask[] = []; - public stateBadgeComponent: Type; - public readonly narrowLogData = typeDefinition(); + readonly narrowType = typeDefinition(); private readonly destroy$ = new Subject(); @@ -61,7 +70,7 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy JOB_OVERVIEW_MODULE_DEFAULT_CONFIG.customComponents.backpressureBadgeComponent; } - public ngOnInit(): void { + ngOnInit(): void { this.jobLocalService .jobWithVertexChanges() .pipe( @@ -87,12 +96,26 @@ export class JobOverviewDrawerBackpressureComponent implements OnInit, OnDestroy }); } - public ngOnDestroy(): void { + ngOnDestroy(): void { this.destroy$.next(); this.destroy$.complete(); } - public prettyPrint(value: number): string { + collapseAll(): void { + this.expandSet.clear(); + this.cdr.markForCheck(); + } + + onExpandChange(subtask: JobBackpressureSubtask, checked: boolean): void { + if (checked) { + this.expandSet.add(subtask.subtask); + } else { + this.expandSet.delete(subtask.subtask); + } + this.cdr.markForCheck(); + } + + prettyPrint(value: number): string { if (isNaN(value)) { return 'N/A'; } else { diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.html index 19af4d599213f..3b91405109124 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.html @@ -27,37 +27,60 @@ nzSize="small" [nzLoading]="isLoading" [nzData]="listOfTask" - [nzScroll]="{ x: '2170px', y: 'calc(100% - 36px)' }" + [nzScroll]="{ x: '2210px', y: 'calc(100% - 36px)' }" [nzFrontPagination]="false" [nzShowPagination]="false" - [nzVirtualItemSize]="virtualItemSize" - [nzVirtualMinBufferPx]="300" - [nzVirtualMaxBufferPx]="300" + [nzVirtualItemSize]="36" + [nzVirtualMinBufferPx]="720" + [nzVirtualMaxBufferPx]="720" [nzVirtualForTrackBy]="trackBySubtask" > - ID - Bytes Received - Records Received - Bytes Sent - Records Sent - Attempt - Host - Start Time - Duration - End Time + ID + + Bytes Received + + + Records Received + + + Bytes Sent + + + Records Sent + + + Attempt + + Host + + Start Time + + + Duration + + + End Time + Accumulated Time (Backpressured/Idle/Busy) Status Durations - Status + + Status + More - + - + {{ subtask['subtask'] }} @@ -115,6 +138,69 @@ > + + + + {{ subtask['subtask'] }} + + + {{ attempt['metrics']['read-bytes'] | humanizeBytes }} + + + + + {{ attempt['metrics']['read-records'] | number: '1.0-0' }} + + + + + {{ attempt['metrics']['write-bytes'] | humanizeBytes }} + + + + + {{ attempt['metrics']['write-records'] | number: '1.0-0' }} + + + {{ attempt.attempt + 1 }} + {{ attempt.host }} + {{ attempt['start_time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }} + {{ attempt.duration | humanizeDuration }} + {{ attempt['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }} + + {{ attempt['metrics']['accumulated-backpressured-time'] | humanizeDuration }} + / + {{ attempt['metrics']['accumulated-idle-time'] | humanizeDuration }} + / + {{ attempt['metrics']['accumulated-busy-time'] | humanizeDuration }} + + + - + + + + + + + + + + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.less index d17ec2fae0af6..46fbaf8b9adad 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.less +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.less @@ -27,6 +27,14 @@ } ::ng-deep { + .ant-table-row-indent { + padding-left: 0 !important; + + & + .ant-table-row-expand-icon { + margin-top: 2px; + } + } + .ant-table-cell { font-size: @font-size-sm; } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.ts index 9e955355fd3a9..e866b0d9d6625 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/subtasks/job-overview-drawer-subtasks.component.ts @@ -24,7 +24,8 @@ import { JobVertexAggregated, JobVertexStatus, JobVertexStatusDuration, - JobVertexSubTask + JobVertexSubTask, + JobVertexSubTaskData } from '@flink-runtime-web/interfaces'; import { JOB_OVERVIEW_MODULE_CONFIG, @@ -49,6 +50,7 @@ function createSortFn(selector: (item: JobVertexSubTask) => number | string): Nz }) export class JobOverviewDrawerSubtasksComponent implements OnInit, OnDestroy { readonly trackBySubtask = (_: number, node: JobVertexSubTask): number => node.subtask; + readonly trackBySubtaskAttempt = (_: number, node: JobVertexSubTaskData): string => `${node.subtask}-${node.attempt}`; readonly sortReadBytesFn = createSortFn(item => item.metrics?.['read-bytes']); readonly sortReadRecordsFn = createSortFn(item => item.metrics?.['read-records']); @@ -61,15 +63,14 @@ export class JobOverviewDrawerSubtasksComponent implements OnInit, OnDestroy { readonly sortEndTimeFn = createSortFn(item => item['end-time']); readonly sortStatusFn = createSortFn(item => item.status); + expandSet = new Set(); listOfTask: JobVertexSubTask[] = []; aggregated?: JobVertexAggregated; isLoading = true; - virtualItemSize = 36; actionComponent: Type; durationBadgeComponent: Type; stateBadgeComponent: Type; - readonly narrowLogData = typeDefinition(); - + readonly narrowType = typeDefinition(); private readonly destroy$ = new Subject(); constructor( @@ -111,6 +112,20 @@ export class JobOverviewDrawerSubtasksComponent implements OnInit, OnDestroy { this.destroy$.complete(); } + collapseAll(): void { + this.expandSet.clear(); + this.cdr.markForCheck(); + } + + onExpandChange(subtask: JobVertexSubTask, checked: boolean): void { + if (checked) { + this.expandSet.add(subtask.subtask); + } else { + this.expandSet.delete(subtask.subtask); + } + this.cdr.markForCheck(); + } + convertStatusDuration(statusDuration: JobVertexStatusDuration): Array<{ state: string; duration: number }> { const orderedKeys = [ JobVertexStatus.CREATED,