Skip to content

Commit

Permalink
[FLINK-18519][REST] Send exception to client when app fails to execute
Browse files Browse the repository at this point in the history
This closes apache#12845.
  • Loading branch information
kl0u committed Jul 8, 2020
1 parent 2210aff commit 78b00f6
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 5 deletions.
21 changes: 21 additions & 0 deletions flink-runtime-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ under the License.
<properties>
<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
<test.ParameterProgramWithEagerSink.name>parameter-program-with-eager-sink</test.ParameterProgramWithEagerSink.name>
</properties>

<dependencies>
Expand Down Expand Up @@ -194,6 +195,25 @@ under the License.
<finalName>${test.parameterProgram.name}</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar-with-eager-sink</id>
<phase>process-test-classes</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>org/apache/flink/runtime/webmonitor/handlers/utils/EagerSinkProgram.java</include>
</includes>
<archive>
<manifest>
<mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.EagerSinkProgram</mainClass>
</manifest>
</archive>
<finalName>${test.ParameterProgramWithEagerSink.name}</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar-without-manifest</id>
Expand Down Expand Up @@ -279,6 +299,7 @@ under the License.
<targetDir>${project.build.directory}</targetDir>
<parameterJarName>${test.parameterProgram.name}</parameterJarName>
<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
<parameterJarWithEagerSinkName>${test.ParameterProgramWithEagerSink.name}</parameterJarWithEagerSinkName>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -34,6 +33,8 @@
import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import javax.annotation.Nonnull;

import java.nio.file.Path;
Expand Down Expand Up @@ -97,9 +98,13 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(

return CompletableFuture
.supplyAsync(() -> applicationRunner.run(gateway, program, effectiveConfiguration), executor)
.thenApply(jobIds -> {
if (jobIds.isEmpty()) {
throw new CompletionException(new ProgramInvocationException("No jobs submitted."));
.handle((jobIds, throwable) -> {
if (throwable != null) {
throw new CompletionException(
new RestHandlerException("Could not execute application.", HttpResponseStatus.BAD_REQUEST, throwable));
} else if (jobIds.isEmpty()) {
throw new CompletionException(
new RestHandlerException("No jobs included in application.", HttpResponseStatus.BAD_REQUEST));
}
return new JarRunResponseBody(jobIds.get(0));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ static List<String> getProgramArgsList(ProgramArgsParType programArgsParType) {
? Arrays.asList(PROG_ARGS) : null;
}

private static <REQB extends JarRequestBody, M extends JarMessageParameters>
protected static <REQB extends JarRequestBody, M extends JarMessageParameters>
HandlerRequest<REQB, M> createRequest(
REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar)
throws HandlerRequestException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,46 @@
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
import org.apache.flink.util.ExceptionUtils;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Tests for the parameter handling of the {@link JarRunHandler}.
Expand All @@ -57,6 +73,8 @@ public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRe

private static JarRunHandler handler;

private static Path jarWithEagerSink;

@BeforeClass
public static void setup() throws Exception {
init();
Expand All @@ -65,6 +83,12 @@ public static void setup() throws Exception {
final Map<String, String> responseHeaders = Collections.emptyMap();
final Executor executor = TestingUtils.defaultExecutor();

final Path jarLocation = Paths.get(System.getProperty("targetDir"));
final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar";
jarWithEagerSink = Files.copy(
jarLocation.resolve(parameterProgramWithEagerSink),
jarDir.resolve("program-with-eager-sink.jar"));

handler = new JarRunHandler(
gatewayRetriever,
timeout,
Expand Down Expand Up @@ -156,6 +180,38 @@ JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) {
return new JarRunRequestBody(null, null, null, null, jobId, null, null);
}

@Test
public void testRestHandlerExceptionThrownWithEagerSinks() throws Exception {
final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest(
getDefaultJarRequestBody(),
getUnresolvedJarMessageParameters(),
getUnresolvedJarMessageParameters(),
jarWithEagerSink
);

try {
handler.handleRequest(request, restfulGateway).get();
} catch (final ExecutionException e) {
final Throwable throwable = ExceptionUtils.stripCompletionException(e.getCause());
assertThat(throwable, instanceOf(RestHandlerException.class));

final RestHandlerException restHandlerException = (RestHandlerException) throwable;
assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST));

final Optional<ProgramInvocationException> invocationException =
ExceptionUtils.findThrowable(restHandlerException, ProgramInvocationException.class);

if (!invocationException.isPresent()) {
fail();
}

final String exceptionMsg = invocationException.get().getMessage();
assertThat(exceptionMsg, containsString("Job was submitted in detached mode."));
return;
}
fail("The test should have failed.");
}

@Override
void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers.utils;

import org.apache.flink.api.java.ExecutionEnvironment;

/**
* Javadoc.
*/
public class EagerSinkProgram {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello", "world").print();
}
}

0 comments on commit 78b00f6

Please sign in to comment.