Skip to content

Commit

Permalink
[FLINK-13386][web]: Add all operators watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
vthinkxie authored and dawidwys committed Sep 27, 2019
1 parent 90b4022 commit c46ba1b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export interface NodesItemInterface {

export interface NodesItemCorrectInterface extends NodesItemInterface {
detail: VerticesItemInterface | undefined;
lowWatermark?: number;
}

export interface NodesItemLinkInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import {
ViewChild
} from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { Subject } from 'rxjs';
import { filter, takeUntil } from 'rxjs/operators';
import { LONG_MIN_VALUE } from 'config';
import { forkJoin, Observable, of, Subject } from 'rxjs';
import { catchError, filter, map, takeUntil } from 'rxjs/operators';
import { NodesItemCorrectInterface, NodesItemLinkInterface } from 'interfaces';
import { JobService } from 'services';
import { JobService, MetricsService } from 'services';
import { DagreComponent } from 'share/common/dagre/dagre.component';

@Component({
Expand Down Expand Up @@ -62,11 +63,52 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
}
}

mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> {
return forkJoin(
nodes.map(node => {
const listOfMetricId = [];
let lowWatermark = NaN;
for (let i = 0; i < node.parallelism; i++) {
listOfMetricId.push(`${i}.currentInputWatermark`);
}
return this.metricService.getMetrics(this.jobId, node.id, listOfMetricId).pipe(
map(metrics => {
let minValue = NaN;
const watermarks: { [index: string]: number } = {};
for (const key in metrics.values) {
const value = metrics.values[key];
const subtaskIndex = key.replace('.currentInputWatermark', '');
watermarks[subtaskIndex] = value;
if (isNaN(minValue) || value < minValue) {
minValue = value;
}
}
if (!isNaN(minValue) && minValue > LONG_MIN_VALUE) {
lowWatermark = minValue;
} else {
lowWatermark = NaN;
}
return { ...node, lowWatermark };
})
);
})
).pipe(catchError(() => of(nodes)));
}

refreshNodesWithWatermarks() {
this.mergeWithWatermarks(this.nodes).subscribe(nodes => {
nodes.forEach(node => {
this.dagreComponent.updateNode(node.id, node);
});
});
}

constructor(
private jobService: JobService,
private router: Router,
private activatedRoute: ActivatedRoute,
public elementRef: ElementRef,
private metricService: MetricsService,
private cdr: ChangeDetectorRef
) {}

Expand All @@ -82,11 +124,10 @@ export class JobOverviewComponent implements OnInit, OnDestroy {
this.links = data.plan.links;
this.jobId = data.plan.jid;
this.dagreComponent.flush(this.nodes, this.links, true).then();
this.refreshNodesWithWatermarks();
} else {
this.nodes = data.plan.nodes;
this.nodes.forEach(node => {
this.dagreComponent.updateNode(node.id, node);
});
this.refreshNodesWithWatermarks();
}
this.cdr.markForCheck();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ <h4 class="content-wrap">
<xhtml:div class="detail">{{operator}}</xhtml:div>
<xhtml:div class="detail description">{{description}}</xhtml:div>
<xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div>
<xhtml:div class="node-label watermark" *ngIf="lowWatermark">Low Watermark <xhtml:br/> {{lowWatermark}}</xhtml:div>
<xhtml:div class="detail last" *ngIf="operatorStrategy">Operation: {{operatorStrategy}}</xhtml:div>
</h4>
</xhtml:div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@
&:first-child {
margin-top: 24px;
}

&.watermark {
font-weight: normal;;
font-weight: 12px;
color: @text-color-secondary;
}
}

.detail {
margin-bottom: 12px;
color: @text-color;

&.description {
color: @heading-color;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class NodeComponent {
operator: string | null;
operatorStrategy: string | null;
parallelism: number | null;
lowWatermark: number | null | undefined;
height = 0;
id: string;

Expand All @@ -45,6 +46,7 @@ export class NodeComponent {
this.operator = this.decodeHTML(value.operator);
this.operatorStrategy = this.decodeHTML(value.operator_strategy);
this.parallelism = value.parallelism;
this.lowWatermark = value.lowWatermark;
this.height = value.height || 0;
this.id = value.id;
if (description && description.length > 300) {
Expand Down

0 comments on commit c46ba1b

Please sign in to comment.