Skip to content

Commit

Permalink
[FLINK-9499][rest] Support JSON request in JarHandlers
Browse files Browse the repository at this point in the history
This closes apache#6330.
  • Loading branch information
zentol committed Jul 19, 2018
1 parent 230f817 commit fefe866
Show file tree
Hide file tree
Showing 14 changed files with 662 additions and 28 deletions.
26 changes: 23 additions & 3 deletions docs/_includes/generated/rest_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,31 @@
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1936993190">Request</button>
<div id="1936993190" class="collapse">
<button data-toggle="collapse" data-target="#315035146">Request</button>
<div id="315035146" class="collapse">
<pre>
<code>
{} </code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
"properties" : {
"entryClass" : {
"type" : "string"
},
"programArgs" : {
"type" : "string"
},
"parallelism" : {
"type" : "integer"
},
"allowNonRestoredState" : {
"type" : "boolean"
},
"savepointPath" : {
"type" : "string"
}
}
} </code>
</pre>
</div>
</td>
Expand Down
41 changes: 41 additions & 0 deletions flink-runtime-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ under the License.

<packaging>jar</packaging>

<properties>
<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
</properties>

<dependencies>

<!-- ===================================================
Expand Down Expand Up @@ -136,6 +141,7 @@ under the License.
</goals>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-program-jar</id>
<phase>process-test-classes</phase>
<goals>
Expand All @@ -153,6 +159,39 @@ under the License.
<finalName>test-program</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
</includes>
<archive>
<manifest>
<mainClass>org.apache.flink.runtime.webmonitor.testutils.ParameterProgram</mainClass>
</manifest>
</archive>
<finalName>${test.parameterProgram.name}</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar-without-manifest</id>
<phase>process-test-classes</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
</includes>
<finalName>${test.ParameterProgramNoManifest.name}</finalName>
</configuration>
</execution>
</executions>
</plugin>

Expand All @@ -162,6 +201,8 @@ under the License.
<configuration>
<systemPropertyVariables>
<targetDir>${project.build.directory}</targetDir>
<parameterJarName>${test.parameterProgram.name}</parameterJarName>
<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.slf4j.Logger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -61,7 +63,7 @@
* Handler to submit jobs uploaded via the Web UI.
*/
public class JarRunHandler extends
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private final Path jarDir;

Expand All @@ -74,7 +76,7 @@ public JarRunHandler(
final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final Path jarDir,
final Configuration configuration,
final Executor executor) {
Expand All @@ -87,15 +89,33 @@ public JarRunHandler(

@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {

final JarRunRequestBody requestBody = request.getRequestBody();

final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
final Path jarFile = jarDir.resolve(pathParameter);

final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
final List<String> programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT);
final String entryClass = fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getEntryClassName()),
() -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
null,
log);

final List<String> programArgs = tokenizeArguments(
fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getProgramArguments()),
() -> getQueryParameter(request, ProgramArgsQueryParameter.class),
null,
log));

final int parallelism = fromRequestBodyOrQueryParameter(
requestBody.getParallelism(),
() -> getQueryParameter(request, ParallelismQueryParameter.class),
ExecutionConfig.PARALLELISM_DEFAULT,
log);

final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);

final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
Expand Down Expand Up @@ -134,12 +154,22 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(
});
}

private static SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
private SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws RestHandlerException {

final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
final JarRunRequestBody requestBody = request.getRequestBody();

final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
requestBody.getAllowNonRestoredState(),
() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
false,
log);
final String savepointPath = fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getSavepointPath()),
() -> emptyToNull(getQueryParameter(request, SavepointPathQueryParameter.class)),
null,
log);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings = SavepointRestoreSettings.forPath(
Expand All @@ -151,6 +181,29 @@ private static SavepointRestoreSettings getSavepointRestoreSettings(
return savepointRestoreSettings;
}

/**
* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
* if it is not null, otherwise returns the default value.
*/
private static <T> T fromRequestBodyOrQueryParameter(
T requestValue,
SupplierWithException<T, RestHandlerException> queryParameterExtractor,
T defaultValue,
Logger log) throws RestHandlerException {
if (requestValue != null) {
return requestValue;
} else {
T queryParameterValue = queryParameterExtractor.get();
if (queryParameterValue != null) {
log.warn("Configuring the job submission via query parameters is deprecated." +
" Please migrate to submitting a JSON request instead.");
return queryParameterValue;
} else {
return defaultValue;
}
}
}

private CompletableFuture<JobGraph> getJobGraphAsync(
final Path jarFile,
@Nullable final String entryClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* {@link MessageHeaders} for {@link JarRunHandler}.
*/
public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
public class JarRunHeaders implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private static final JarRunHeaders INSTANCE = new JarRunHeaders();

Expand All @@ -44,8 +43,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
public Class<JarRunRequestBody> getRequestClass() {
return JarRunRequestBody.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RequestBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

/**
* {@link RequestBody} for running a jar.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JarRunRequestBody implements RequestBody {

private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
private static final String FIELD_NAME_PARALLELISM = "parallelism";
private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";

@JsonProperty(FIELD_NAME_ENTRY_CLASS)
@Nullable
private String entryClassName;

@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
@Nullable
private String programArguments;

@JsonProperty(FIELD_NAME_PARALLELISM)
@Nullable
private Integer parallelism;

@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
@Nullable
private Boolean allowNonRestoredState;

@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
@Nullable
private String savepointPath;

public JarRunRequestBody() {
this(null, null, null, null, null);
}

@JsonCreator
public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
this.entryClassName = entryClassName;
this.programArguments = programArguments;
this.parallelism = parallelism;
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
}

@Nullable
@JsonIgnore
public String getEntryClassName() {
return entryClassName;
}

@Nullable
@JsonIgnore
public String getProgramArguments() {
return programArguments;
}

@Nullable
@JsonIgnore
public Integer getParallelism() {
return parallelism;
}

@Nullable
@JsonIgnore
public Boolean getAllowNonRestoredState() {
return allowNonRestoredState;
}

@Nullable
@JsonIgnore
public String getSavepointPath() {
return savepointPath;
}
}
Loading

0 comments on commit fefe866

Please sign in to comment.