Skip to content

Commit

Permalink
[FLINK-14816] Add thread dump feature for taskmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
lamberken authored and tillrohrmann committed May 1, 2020
1 parent 42a4dfd commit 0294409
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { TaskManagerService } from 'services';
})
export class TaskManagerStatusComponent implements OnInit, OnDestroy {
@Input() isLoading = true;
listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }];
listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }, { path: 'thread-dump', title: 'Thread Dump' }];
taskManagerDetail: TaskManagerDetailInterface;
private destroy$ = new Subject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { TaskManagerLogListComponent } from './log-list/task-manager-log-list.co
import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component';

const routes: Routes = [
{
Expand All @@ -47,6 +48,13 @@ const routes: Routes = [
path: 'log-list'
}
},
{
path: 'thread-dump',
component: TaskManagerThreadDumpComponent,
data: {
path: 'thread-dump'
}
},
{
path: 'log-list/:logName',
component: TaskManagerLogDetailComponent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerStatusComponent } from './status/task-manager-status.component';
import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component';

@NgModule({
imports: [CommonModule, ShareModule, TaskManagerRoutingModule],
Expand All @@ -36,7 +37,8 @@ import { TaskManagerStatusComponent } from './status/task-manager-status.compone
TaskManagerComponent,
TaskManagerStatusComponent,
TaskManagerLogListComponent,
TaskManagerLogDetailComponent
TaskManagerLogDetailComponent,
TaskManagerThreadDumpComponent
]
})
export class TaskManagerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http:https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<flink-monaco-editor [value]="dump"></flink-monaco-editor>
<flink-refresh-download [downloadHref]="'taskmanagers/'+taskManagerDetail?.id+'/thread-dump'" [downloadName]="'taskmanager_'+taskManagerDetail?.id+'_thread_dump'" (reload)="reload()"></flink-refresh-download>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

flink-monaco-editor {
height: calc(~"100vh - 160px");
}

:host {
position: relative;
display: block;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ChangeDetectorRef, Component, OnInit, ViewChild, ChangeDetectionStrategy } from '@angular/core';
import { TaskManagerService } from 'services';
import { first } from 'rxjs/operators';
import { MonacoEditorComponent } from 'share/common/monaco-editor/monaco-editor.component';
import { TaskManagerDetailInterface } from 'interfaces';

@Component({
selector: 'flink-task-manager-thread-dump',
templateUrl: './task-manager-thread-dump.component.html',
styleUrls: ['./task-manager-thread-dump.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush
})
export class TaskManagerThreadDumpComponent implements OnInit {
@ViewChild(MonacoEditorComponent) monacoEditorComponent: MonacoEditorComponent;
dump = '';
taskManagerDetail: TaskManagerDetailInterface;

reload() {
if (this.taskManagerDetail) {
this.taskManagerService.loadThreadDump(this.taskManagerDetail.id).subscribe(data => {
this.monacoEditorComponent.layout();
this.dump = data;
this.cdr.markForCheck();
}, () => {
this.cdr.markForCheck();
});
}
}

constructor(private taskManagerService: TaskManagerService, private cdr: ChangeDetectorRef) {}

ngOnInit() {
this.taskManagerService.taskManagerDetail$.pipe(first()).subscribe(data => {
this.taskManagerDetail = data;
this.reload();
this.cdr.markForCheck();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,15 @@ export class TaskManagerService {
);
}

/**
* Load TM thread dump
*/
loadThreadDump(taskManagerId: string) {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, {
responseType: 'text',
headers: new HttpHeaders().append('Cache-Control', 'no-cache')
});
}

constructor(private httpClient: HttpClient) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.taskmanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Rest handler which serves the thread dump files from {@link TaskExecutor}.
*/
public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

public TaskManagerThreadDumpFileHandler(
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
}

@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> taskManagerIdAndFileName) {
return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;

/**
* Headers for the {@link TaskManagerThreadDumpFileHandler}.
*/
public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {

private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();

private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY);

private TaskManagerThreadDumpFileHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public TaskManagerMessageParameters getUnresolvedMessageParameters() {
return new TaskManagerMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static TaskManagerThreadDumpFileHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public enum FileType {
* The stdout file type for taskmanager.
*/
STDOUT,

/**
* Thread dump for taskmanager.
*/
THREAD_DUMP,
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -945,6 +946,8 @@ public CompletableFuture<TransientBlobKey> requestFileUploadByType(FileType file
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
case THREAD_DUMP:
return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType.toString());
default:
filePath = null;
}
Expand Down Expand Up @@ -1702,7 +1705,6 @@ private CompletableFuture<TransientBlobKey> putTransientBlobStream(InputStream i
return CompletableFuture.completedFuture(transientBlobKey);
}


// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* Utilities for {@link java.lang.management.ManagementFactory}.
*/
public final class JvmUtils {

/**
* Returns the thread info for all live threads with stack trace and synchronization information.
*
* @return the thread dump stream of current JVM
*/
public static InputStream threadDumpStream() {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();

List<InputStream> streams = Arrays
.stream(threadMxBean.dumpAllThreads(true, true))
.map((v) -> v.toString().getBytes(StandardCharsets.UTF_8))
.map(ByteArrayInputStream::new)
.collect(Collectors.toList());

return new SequenceInputStream(Collections.enumeration(streams));
}

/**
* Private default constructor to avoid instantiation.
*/
private JvmUtils() {}

}
Loading

0 comments on commit 0294409

Please sign in to comment.