Skip to content

Commit

Permalink
[FLINK-17620] Rename StandaloneJobClusterEntryPoint to StandaloneAppl…
Browse files Browse the repository at this point in the history
…icationClusterEntryPoint

This PR renames the entrypoint but DOES NOT change the scripts as this would be
a breaking change for already existing deployments and such a change would
require a more thorough and more visible discussion with the community.

This closes apache#12087.
  • Loading branch information
kl0u committed May 12, 2020
1 parent de34fdd commit 6f72401
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import static java.util.Objects.requireNonNull;

/**
* Configuration for the {@link StandaloneJobClusterEntryPoint}.
* Configuration for the {@link StandaloneApplicationClusterEntryPoint}.
*/
final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {
final class StandaloneApplicationClusterConfiguration extends EntrypointClusterConfiguration {

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
Expand All @@ -43,7 +43,7 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
@Nullable
private final String jobClassName;

StandaloneJobClusterConfiguration(
StandaloneApplicationClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@Nonnull String[] args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;

/**
* Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given
* Parser factory which generates a {@link StandaloneApplicationClusterConfiguration} from a given
* list of command line arguments.
*/
public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory<StandaloneJobClusterConfiguration> {
public class StandaloneApplicationClusterConfigurationParserFactory implements ParserResultFactory<StandaloneApplicationClusterConfiguration> {

private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
.longOpt("job-classname")
Expand Down Expand Up @@ -75,7 +75,7 @@ public Options getOptions() {
}

@Override
public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) throws FlinkParseException {
public StandaloneApplicationClusterConfiguration createResult(@Nonnull CommandLine commandLine) throws FlinkParseException {
final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final int restPort = getRestPort(commandLine);
Expand All @@ -84,7 +84,7 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
final JobID jobId = getJobId(commandLine);
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());

return new StandaloneJobClusterConfiguration(
return new StandaloneApplicationClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,28 @@
* location.
*/
@Internal
public final class StandaloneJobClusterEntryPoint extends ApplicationClusterEntryPoint {
public final class StandaloneApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {

private StandaloneJobClusterEntryPoint(
private StandaloneApplicationClusterEntryPoint(
final Configuration configuration,
final PackagedProgram program) {
super(configuration, program, StandaloneResourceManagerFactory.getInstance());
}

public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args);
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneApplicationClusterEntryPoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);

final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
final CommandLineParser<StandaloneApplicationClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneApplicationClusterConfigurationParserFactory());

StandaloneJobClusterConfiguration clusterConfiguration = null;
StandaloneApplicationClusterConfiguration clusterConfiguration = null;
try {
clusterConfiguration = commandLineParser.parse(args);
} catch (Exception e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StandaloneJobClusterEntryPoint.class.getSimpleName());
commandLineParser.printHelp(StandaloneApplicationClusterEntryPoint.class.getSimpleName());
System.exit(1);
}

Expand All @@ -90,21 +90,21 @@ public static void main(String[] args) {
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);

StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, program);
StandaloneApplicationClusterEntryPoint entrypoint = new StandaloneApplicationClusterEntryPoint(configuration, program);

ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

@VisibleForTesting
static Configuration loadConfigurationFromClusterConfig(StandaloneJobClusterConfiguration clusterConfiguration) {
static Configuration loadConfigurationFromClusterConfig(StandaloneApplicationClusterConfiguration clusterConfiguration) {
Configuration configuration = loadConfiguration(clusterConfiguration);
setStaticJobId(clusterConfiguration, configuration);
SavepointRestoreSettings.toConfiguration(clusterConfiguration.getSavepointRestoreSettings(), configuration);
return configuration;
}

private static PackagedProgram getPackagedProgram(
final StandaloneJobClusterConfiguration clusterConfiguration) throws IOException, FlinkException {
final StandaloneApplicationClusterConfiguration clusterConfiguration) throws IOException, FlinkException {
final PackagedProgramRetriever programRetriever = getPackagedProgramRetriever(
clusterConfiguration.getArgs(),
clusterConfiguration.getJobClassName());
Expand All @@ -122,7 +122,7 @@ private static PackagedProgramRetriever getPackagedProgramRetriever(
return retrieverBuilder.build();
}

private static void setStaticJobId(StandaloneJobClusterConfiguration clusterConfiguration, Configuration configuration) {
private static void setStaticJobId(StandaloneApplicationClusterConfiguration clusterConfiguration, Configuration configuration) {
final JobID jobId = clusterConfiguration.getJobId();
if (jobId != null) {
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import static org.junit.Assert.fail;

/**
* Tests for the {@link StandaloneJobClusterConfigurationParserFactory}.
* Tests for the {@link StandaloneApplicationClusterConfigurationParserFactory}.
*/
public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {
public class StandaloneApplicationClusterConfigurationParserFactoryTest extends TestLogger {

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
Expand All @@ -68,7 +68,7 @@ public void createEmptyFlinkConfiguration() throws IOException {
confFile.createNewFile();
}

private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
private static final CommandLineParser<StandaloneApplicationClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneApplicationClusterConfigurationParserFactory());
private static final String JOB_CLASS_NAME = "foobar";

@Test
Expand All @@ -90,11 +90,11 @@ public void testEntrypointClusterConfigurationToConfigurationParsing() throws Fl
String.format("-D%s=%s", key, value),
arg1, arg2};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));

final Configuration configuration = StandaloneJobClusterEntryPoint
final Configuration configuration = StandaloneApplicationClusterEntryPoint
.loadConfigurationFromClusterConfig(clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
Expand All @@ -113,8 +113,8 @@ public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing
"--job-id", jobID.toHexString()
};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final Configuration configuration = StandaloneJobClusterEntryPoint
final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final Configuration configuration = StandaloneApplicationClusterEntryPoint
.loadConfigurationFromClusterConfig(clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
Expand All @@ -131,7 +131,7 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
final String arg2 = "arg2";
final String[] args = {"--configDir", confDirPath, "--webui-port", String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
Expand All @@ -151,7 +151,7 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
public void testOnlyRequiredArguments() throws FlinkParseException {
final String[] args = {"--configDir", confDirPath};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getDynamicProperties(), is(equalTo(new Properties())));
Expand All @@ -174,9 +174,9 @@ public void testMissingRequiredArgument() throws FlinkParseException {
public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
final String restorePath = "foobar";
final String[] args = {"-c", confDirPath, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration = commandLineParser.parse(args);

final SavepointRestoreSettings savepointRestoreSettings = standaloneJobClusterConfiguration.getSavepointRestoreSettings();
final SavepointRestoreSettings savepointRestoreSettings = standaloneApplicationClusterConfiguration.getSavepointRestoreSettings();

assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
Expand All @@ -188,9 +188,9 @@ public void testSetJobIdManually() throws FlinkParseException {
final JobID jobId = new JobID();
final String[] args = {"--configDir", confDirPath, "--job-classname", "foobar", "--job-id", jobId.toString()};

final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration = commandLineParser.parse(args);

assertThat(standaloneJobClusterConfiguration.getJobId(), is(equalTo(jobId)));
assertThat(standaloneApplicationClusterConfiguration.getJobId(), is(equalTo(jobId)));
}

@Test
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testShortOptions() throws FlinkParseException {
"-s", savepointRestorePath,
"-n"};

final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/flink-console.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case $SERVICE in
;;

(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;

(*)
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/flink-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case $DAEMON in
;;

(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;

(*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ JOB_ID="00000000000000000000000000000000"

function ha_cleanup() {
stop_watchdogs
kill_all 'StandaloneJobClusterEntryPoint'
kill_all 'StandaloneApplicationClusterEntryPoint'
}

on_exit ha_cleanup
Expand Down Expand Up @@ -128,7 +128,7 @@ function run_ha_test() {
wait_job_running ${JOB_ID}

# start the watchdog that keeps the number of JMs stable
start_ha_jm_watchdog 1 "StandaloneJobClusterEntryPoint" run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM}
start_ha_jm_watchdog 1 "StandaloneApplicationClusterEntryPoint" run_job ${PARALLELISM} ${BACKEND} ${ASYNC} ${INCREM}

# start the watchdog that keeps the number of TMs stable
start_ha_tm_watchdog ${JOB_ID} ${neededTaskmanagers}
Expand All @@ -139,7 +139,7 @@ function run_ha_test() {
for (( c=1; c<=${JM_KILLS}; c++ )); do
# kill the JM and wait for watchdog to
# create a new one which will take over
kill_single 'StandaloneJobClusterEntryPoint'
kill_single 'StandaloneApplicationClusterEntryPoint'
# let the job start and take some checkpoints
wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonejob-${c}"
done
Expand Down

0 comments on commit 6f72401

Please sign in to comment.