Skip to content

Commit

Permalink
[FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to …
Browse files Browse the repository at this point in the history
…random JobID for non-HA setups

This closes apache#8539.
  • Loading branch information
knaufk authored and tillrohrmann committed Jun 7, 2019
1 parent f6e3eb1 commit 9770551
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
@Nullable
private final JobID jobId;

@Nullable
Expand All @@ -50,11 +50,11 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
@Nullable String hostname,
int restPort,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull JobID jobId,
@Nullable JobID jobId,
@Nullable String jobClassName) {
super(configDir, dynamicProperties, args, hostname, restPort);
this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.jobId = requireNonNull(jobId, "jobId");
this.jobId = jobId;
this.jobClassName = jobClassName;
}

Expand All @@ -63,7 +63,7 @@ SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointRestoreSettings;
}

@Nonnull
@Nullable
JobID getJobId() {
return jobId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.cli.Options;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Properties;

Expand All @@ -43,8 +44,6 @@
*/
public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory<StandaloneJobClusterConfiguration> {

static final JobID DEFAULT_JOB_ID = new JobID(0, 0);

private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
.longOpt("job-classname")
.required(false)
Expand Down Expand Up @@ -105,10 +104,11 @@ private int getRestPort(CommandLine commandLine) throws FlinkParseException {
}
}

@Nullable
private static JobID getJobId(CommandLine commandLine) throws FlinkParseException {
String jobId = commandLine.getOptionValue(JOB_ID_OPTION.getOpt());
if (jobId == null) {
return DEFAULT_JOB_ID;
return null;
}
try {
return JobID.fromHexString(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
Expand All @@ -35,6 +36,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -43,6 +46,8 @@
*/
public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

@Nonnull
private final JobID jobId;

Expand Down Expand Up @@ -97,14 +102,29 @@ public static void main(String[] args) {

StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
configuration,
clusterConfiguration.getJobId(),
resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration),
clusterConfiguration.getSavepointRestoreSettings(),
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

@VisibleForTesting
@Nonnull
static JobID resolveJobIdForCluster(Optional<JobID> optionalJobID, Configuration configuration) {
return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration));
}

@Nonnull
private static JobID createJobIdForCluster(Configuration globalConfiguration) {
if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) {
return ZERO_JOB_ID;
} else {
return JobID.generate();
}
}

@VisibleForTesting
static void setDefaultExecutionModeIfNotConfigured(Configuration configuration) {
if (isNoExecutionModeConfigured(configuration)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
import java.util.Optional;
import java.util.Properties;

import static org.apache.flink.container.entrypoint.StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -91,7 +89,7 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti

assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));

assertThat(clusterConfiguration.getJobId(), is(equalTo(DEFAULT_JOB_ID)));
assertThat(clusterConfiguration.getJobId(), is(nullValue()));
}

@Test
Expand All @@ -106,7 +104,7 @@ public void testOnlyRequiredArguments() throws FlinkParseException {
assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
assertThat(clusterConfiguration.getHostname(), is(nullValue()));
assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(not(nullValue())));
assertThat(clusterConfiguration.getJobId(), is(nullValue()));
assertThat(clusterConfiguration.getJobClassName(), is(nullValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint.ExecutionMode;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Optional;

import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsNot.not;

/**
* Tests for the {@link StandaloneJobClusterEntryPoint}.
Expand Down Expand Up @@ -59,11 +66,69 @@ public void testDontOverwriteExecutionMode() {
assertThat(getExecutionMode(configuration), equalTo(ExecutionMode.NORMAL));
}

@Test
public void configuredJobIDTakesPrecedenceWithHA() {
Optional<JobID> jobID = Optional.of(JobID.generate());

Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);

JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
jobID,
globalConfiguration);

assertThat(jobIdForCluster, is(jobID.get()));
}

@Test
public void configuredJobIDTakesPrecedenceWithoutHA() {
Optional<JobID> jobID = Optional.of(JobID.generate());

Configuration globalConfiguration = new Configuration();

JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
jobID,
globalConfiguration);

assertThat(jobIdForCluster, is(jobID.get()));
}

@Test
public void jobIDdefaultsToZeroWithHA() {
Optional<JobID> jobID = Optional.empty();

Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);

JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
jobID,
globalConfiguration);

assertThat(jobIdForCluster, is(ZERO_JOB_ID));
}

@Test
public void jobIDdefaultsToRandomJobIDWithoutHA() {
Optional<JobID> jobID = Optional.empty();

Configuration globalConfiguration = new Configuration();

JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
jobID,
globalConfiguration);

assertThat(jobIdForCluster, is(not(ZERO_JOB_ID)));
}

private static void setExecutionMode(Configuration configuration, ExecutionMode executionMode) {
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
}

private static ExecutionMode getExecutionMode(Configuration configuration) {
return ExecutionMode.valueOf(configuration.getString(ClusterEntrypoint.EXECUTION_MODE));
}

private static void enableHighAvailability(final Configuration configuration) {
configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
}
}

0 comments on commit 9770551

Please sign in to comment.