Skip to content

Commit

Permalink
[FLINK-25022][rest] Run jars in separate threads
Browse files Browse the repository at this point in the history
Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal).
  • Loading branch information
zentol authored Dec 4, 2021
1 parent 9bbadb9 commit 1f212f2
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.concurrent;

import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/** An {@link Executor} that runs every runnable in a separate thread. */
public final class SeparateThreadExecutor implements Executor {
private final ThreadFactory threadFactory;

public SeparateThreadExecutor(ThreadFactory threadFactory) {
this.threadFactory = Preconditions.checkNotNull(threadFactory);
}

@Override
public void execute(@Nonnull Runnable command) {
threadFactory.newThread(command).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.runtime.webmonitor;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -36,6 +38,8 @@
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.SeparateThreadExecutor;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;

Expand All @@ -45,13 +49,18 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/** Container for the web submission handlers. */
public class WebSubmissionExtension implements WebMonitorExtension {

private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
webSubmissionHandlers;

// for easier access during testing
private final JarUploadHandler jarUploadHandler;
private final JarRunHandler jarRunHandler;

public WebSubmissionExtension(
Configuration configuration,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Expand All @@ -61,10 +70,38 @@ public WebSubmissionExtension(
Executor executor,
Time timeout)
throws Exception {
this(
configuration,
leaderRetriever,
responseHeaders,
localAddressFuture,
jarDir,
executor,
timeout,
() -> new DetachedApplicationRunner(true));
}

@VisibleForTesting
WebSubmissionExtension(
Configuration configuration,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Map<String, String> responseHeaders,
CompletableFuture<String> localAddressFuture,
Path jarDir,
Executor executor,
Time timeout,
Supplier<ApplicationRunner> applicationRunnerSupplier)
throws Exception {

webSubmissionHandlers = new ArrayList<>();

final JarUploadHandler jarUploadHandler =
final Executor jarRunExecutor =
new SeparateThreadExecutor(
new ExecutorThreadFactory.Builder()
.setPoolName("flink-jar-runner")
.build());

jarUploadHandler =
new JarUploadHandler(
leaderRetriever,
timeout,
Expand All @@ -84,16 +121,16 @@ public WebSubmissionExtension(
configuration,
executor);

final JarRunHandler jarRunHandler =
jarRunHandler =
new JarRunHandler(
leaderRetriever,
timeout,
responseHeaders,
JarRunHeaders.getInstance(),
jarDir,
configuration,
executor,
() -> new DetachedApplicationRunner(true));
jarRunExecutor,
applicationRunnerSupplier);

final JarDeleteHandler jarDeleteHandler =
new JarDeleteHandler(
Expand All @@ -112,7 +149,7 @@ public WebSubmissionExtension(
JarPlanGetHeaders.getInstance(),
jarDir,
configuration,
executor);
jarRunExecutor);

final JarPlanHandler postJarPlanHandler =
new JarPlanHandler(
Expand Down Expand Up @@ -141,4 +178,14 @@ public CompletableFuture<Void> closeAsync() {
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
return webSubmissionHandlers;
}

@VisibleForTesting
JarUploadHandler getJarUploadHandler() {
return jarUploadHandler;
}

@VisibleForTesting
JarRunHandler getJarRunHandler() {
return jarRunHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
Expand All @@ -29,7 +30,8 @@
/** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */
abstract class JarMessageParameters extends MessageParameters {

final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
@VisibleForTesting
public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();

final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
Expand Down Expand Up @@ -82,7 +83,8 @@ public JarRunHandler(
}

@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@VisibleForTesting
public CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<JarRunRequestBody> request,
@Nonnull final DispatcherGateway gateway)
throws RestHandlerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand Down Expand Up @@ -68,7 +69,8 @@ public JarUploadHandler(
}

@Override
protected CompletableFuture<JarUploadResponseBody> handleRequest(
@VisibleForTesting
public CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody> request,
@Nonnull final RestfulGateway gateway)
throws RestHandlerException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.util.concurrent.Executors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

class WebSubmissionExtensionTest {

private static final String JAR_NAME = "output-test-program.jar";

@Test
void applicationsRunInSeparateThreads(@TempDir Path tempDir) throws Exception {
final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir"));
// create a copy because the upload handler moves uploaded jars (because it assumes it to be
// a temporary file)
final Path jarFile =
Files.copy(
Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
tempDir.resolve("app.jar"));

final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();

final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
new ThreadCapturingApplicationRunner();

final WebSubmissionExtension webSubmissionExtension =
new WebSubmissionExtension(
new Configuration(),
() -> CompletableFuture.completedFuture(dispatcherGateway),
Collections.emptyMap(),
new CompletableFuture<>(),
uploadDir,
Executors.directExecutor(),
Time.of(5, TimeUnit.SECONDS),
() -> threadCapturingApplicationRunner);

final String jarId = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway);

final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler();

final JarRunMessageParameters parameters = new JarRunMessageParameters();
parameters.jarIdPathParameter.resolve(jarId);
final HandlerRequest<JarRunRequestBody> runRequest =
HandlerRequest.create(new JarRunRequestBody(), parameters);

// run several applications in sequence, and verify that each thread is unique
int numApplications = 20;
for (int i = 0; i < numApplications; i++) {
jarRunHandler.handleRequest(runRequest, dispatcherGateway).get();
}
assertThat(threadCapturingApplicationRunner.getThreads().size()).isEqualTo(numApplications);
}

private static String uploadJar(
WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway)
throws Exception {
final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler();

final HandlerRequest<EmptyRequestBody> uploadRequest =
HandlerRequest.create(
EmptyRequestBody.getInstance(),
EmptyMessageParameters.getInstance(),
Collections.singletonList(jarFile.toFile()));

return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename();
}

private static class ThreadCapturingApplicationRunner implements ApplicationRunner {

private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>());

@Override
public List<JobID> run(
DispatcherGateway dispatcherGateway,
PackagedProgram program,
Configuration configuration) {
threads.add(Thread.currentThread());
return Collections.singletonList(new JobID());
}

public Collection<Thread> getThreads() {
return threads;
}
}
}

0 comments on commit 1f212f2

Please sign in to comment.