Skip to content

Commit

Permalink
[FLINK-29543][rest] Add flinkConfiguration field to jar run/plan requ…
Browse files Browse the repository at this point in the history
…ests
  • Loading branch information
czy006 committed Nov 15, 2022
1 parent 7e51db9 commit 962a347
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 28 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@
"entryClass" : {
"type" : "string"
},
"flinkConfiguration" : {
"type" : "object",
"additionalProperties" : {
"type" : "string"
}
},
"jobId" : {
"type" : "any"
},
Expand Down Expand Up @@ -564,6 +570,12 @@
"entryClass" : {
"type" : "string"
},
"flinkConfiguration" : {
"type" : "object",
"additionalProperties" : {
"type" : "string"
}
},
"jobId" : {
"type" : "any"
},
Expand Down
8 changes: 8 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,10 @@ components:
properties:
entryClass:
type: string
flinkConfiguration:
type: object
additionalProperties:
type: string
jobId:
$ref: '#/components/schemas/JobID'
parallelism:
Expand All @@ -2154,6 +2158,10 @@ components:
type: boolean
entryClass:
type: string
flinkConfiguration:
type: object
additionalProperties:
type: string
jobId:
$ref: '#/components/schemas/JobID'
parallelism:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
@Nonnull final RestfulGateway gateway)
throws RestHandlerException {
final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
final Configuration effectiveConfiguration = new Configuration(this.configuration);
context.applyToConfiguration(effectiveConfiguration, request);

return CompletableFuture.supplyAsync(
() -> {
try (PackagedProgram packagedProgram =
context.toPackagedProgram(configuration)) {
context.toPackagedProgram(effectiveConfiguration)) {
final JobGraph jobGraph =
context.toJobGraph(packagedProgram, configuration, true);
context.toJobGraph(packagedProgram, effectiveConfiguration, true);
return planGenerator.apply(jobGraph);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RequestBody;

Expand All @@ -28,22 +29,32 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

/** {@link RequestBody} for querying the plan from a jar. */
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JarPlanRequestBody extends JarRequestBody {
JarPlanRequestBody() {
super(null, null, null, null, null);
@VisibleForTesting
public JarPlanRequestBody() {
super(null, null, null, null, null, null);
}

@JsonCreator
JarPlanRequestBody(
public JarPlanRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
List<String> programArgumentsList,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) {
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
@Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
Map<String, String> flinkConfiguration) {
super(
entryClassName,
programArguments,
programArgumentsList,
parallelism,
jobId,
flinkConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
Expand All @@ -33,6 +34,8 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Base class for {@link RequestBody} for running a jar or querying the plan. */
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -42,6 +45,7 @@ public abstract class JarRequestBody implements RequestBody {
static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = "programArgsList";
static final String FIELD_NAME_PARALLELISM = "parallelism";
static final String FIELD_NAME_FLINK_CONFIGURATION = "flinkConfiguration";
static final String FIELD_NAME_JOB_ID = "jobId";

@JsonProperty(FIELD_NAME_ENTRY_CLASS)
Expand All @@ -66,8 +70,12 @@ public abstract class JarRequestBody implements RequestBody {
@Nullable
private JobID jobId;

@JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
@Nullable
private Map<String, String> flinkConfiguration;

JarRequestBody() {
this(null, null, null, null, null);
this(null, null, null, null, null, null);
}

@JsonCreator
Expand All @@ -77,12 +85,15 @@ public abstract class JarRequestBody implements RequestBody {
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
List<String> programArgumentsList,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) {
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
@Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
Map<String, String> flinkConfiguration) {
this.entryClassName = entryClassName;
this.programArguments = programArguments;
this.programArgumentsList = programArgumentsList;
this.parallelism = parallelism;
this.jobId = jobId;
this.flinkConfiguration = flinkConfiguration;
}

@Nullable
Expand Down Expand Up @@ -114,4 +125,11 @@ public Integer getParallelism() {
public JobID getJobId() {
return jobId;
}

@JsonIgnore
public Configuration getFlinkConfiguration() {
return Optional.ofNullable(flinkConfiguration)
.map(Configuration::fromMap)
.orElse(new Configuration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
effectiveConfiguration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);

final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
context.applyToConfiguration(effectiveConfiguration);
context.applyToConfiguration(effectiveConfiguration, request);
SavepointRestoreSettings.toConfiguration(
getSavepointRestoreSettings(request), effectiveConfiguration);
getSavepointRestoreSettings(request, effectiveConfiguration),
effectiveConfiguration);

final PackagedProgram program = context.toPackagedProgram(effectiveConfiguration);

Expand All @@ -126,28 +127,34 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
}

private SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<JarRunRequestBody> request) throws RestHandlerException {
final @Nonnull HandlerRequest<JarRunRequestBody> request,
final Configuration effectiveConfiguration)
throws RestHandlerException {

final JarRunRequestBody requestBody = request.getRequestBody();

final boolean allowNonRestoredState =
fromRequestBodyOrQueryParameter(
requestBody.getAllowNonRestoredState(),
() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
false,
effectiveConfiguration.get(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE),
log);
final String savepointPath =
fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getSavepointPath()),
requestBody.getSavepointPath(),
() ->
emptyToNull(
getQueryParameter(
request, SavepointPathQueryParameter.class)),
null,
effectiveConfiguration.get(SavepointConfigOptions.SAVEPOINT_PATH),
log);
final RestoreMode restoreMode =
Optional.ofNullable(requestBody.getRestoreMode())
.orElseGet(SavepointConfigOptions.RESTORE_MODE::defaultValue);
.orElseGet(
() ->
effectiveConfiguration.get(
SavepointConfigOptions.RESTORE_MODE));
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

/** {@link RequestBody} for running a jar. */
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -51,7 +52,7 @@ public class JarRunRequestBody extends JarRequestBody {
private RestoreMode restoreMode;

public JarRunRequestBody() {
this(null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null);
}

@JsonCreator
Expand All @@ -65,8 +66,16 @@ public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode) {
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode,
@Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
Map<String, String> flinkConfiguration) {
super(
entryClassName,
programArguments,
programArgumentsList,
parallelism,
jobId,
flinkConfiguration);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
this.restoreMode = restoreMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public static <R extends JarRequestBody> JarHandlerContext fromRequest(
throws RestHandlerException {
final JarRequestBody requestBody = request.getRequestBody();

Configuration configuration = requestBody.getFlinkConfiguration();

final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
Path jarFile = jarDir.resolve(pathParameter);

Expand All @@ -116,7 +118,7 @@ public static <R extends JarRequestBody> JarHandlerContext fromRequest(
fromRequestBodyOrQueryParameter(
requestBody.getParallelism(),
() -> getQueryParameter(request, ParallelismQueryParameter.class),
CoreOptions.DEFAULT_PARALLELISM.defaultValue(),
configuration.get(CoreOptions.DEFAULT_PARALLELISM),
log);

JobID jobId =
Expand All @@ -129,8 +131,14 @@ public static <R extends JarRequestBody> JarHandlerContext fromRequest(
return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism, jobId);
}

public void applyToConfiguration(final Configuration configuration) {
public void applyToConfiguration(
final Configuration configuration,
final HandlerRequest<? extends JarRequestBody> request) {
checkNotNull(configuration);
checkNotNull(request);

Configuration restFlinkConfig = request.getRequestBody().getFlinkConfiguration();
configuration.addAll(restFlinkConfig);

if (jobId != null) {
configuration.set(
Expand Down Expand Up @@ -184,6 +192,26 @@ public PackagedProgram toPackagedProgram(Configuration configuration) {
throw new CompletionException(e);
}
}

@VisibleForTesting
String getEntryClass() {
return entryClass;
}

@VisibleForTesting
List<String> getProgramArgs() {
return programArgs;
}

@VisibleForTesting
int getParallelism() {
return parallelism;
}

@VisibleForTesting
JobID getJobId() {
return jobId;
}
}

/** Parse program arguments in jar run or plan request. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ void testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() throws Exc
.isEqualTo(HttpResponseStatus.BAD_REQUEST));
}

@Test
void testConfigurationViaConfiguration() throws Exception {
final REQB requestBody = getJarRequestWithConfiguration();
handleRequest(
createRequest(
requestBody,
getUnresolvedJarMessageParameters(),
getUnresolvedJarMessageParameters(),
jarWithManifest));
validateGraphWithFlinkConfig(LAST_SUBMITTED_JOB_GRAPH_REFERENCE.get());
}

@Test
void testProvideJobId() throws Exception {
JobID jobId = new JobID();
Expand Down Expand Up @@ -306,6 +318,8 @@ private static <X> List<String> getValuesAsString(MessageQueryParameter<X> param

abstract REQB getJarRequestBodyWithJobId(JobID jobId);

abstract REQB getJarRequestWithConfiguration();

abstract void handleRequest(HandlerRequest<REQB> request) throws Exception;

JobGraph validateDefaultGraph() {
Expand All @@ -323,11 +337,13 @@ JobGraph validateGraph() {
return jobGraph;
}

abstract void validateGraphWithFlinkConfig(JobGraph jobGraph);

private static Optional<JobGraph> getLastSubmittedJobGraphAndReset() {
return Optional.ofNullable(LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null));
}

private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
ExecutionConfig executionConfig;
try {
executionConfig =
Expand Down
Loading

0 comments on commit 962a347

Please sign in to comment.