forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-9488] Add container entry point StandaloneJobClusterEntryPoint
The StandaloneJobClusterEntryPoint is the basic entry point for containers. It is started with the user code jar in its classpath and the classname of the user program. The entrypoint will then load this user program via the classname and execute its main method. This will generate a JobGraph which is then used to start the MiniDispatcher. This closes apache#6315.
- Loading branch information
1 parent
ab9bd87
commit 8f467c1
Showing
11 changed files
with
580 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http:https://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<project xmlns="http:https://maven.apache.org/POM/4.0.0" xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/maven-v4_0_0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-parent</artifactId> | ||
<version>1.6-SNAPSHOT</version> | ||
<relativePath>..</relativePath> | ||
</parent> | ||
|
||
<artifactId>flink-container_${scala.binary.version}</artifactId> | ||
<name>flink-container</name> | ||
<packaging>jar</packaging> | ||
|
||
<dependencies> | ||
|
||
<!-- set all Flink dependencies to provided, so they and their transitive --> | ||
<!-- dependencies do not get promoted to direct dependencies during shading --> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-runtime_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-clients_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- test dependencies --> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-test-utils-junit</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
</project> |
43 changes: 43 additions & 0 deletions
43
...rc/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.container.entrypoint; | ||
|
||
import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration; | ||
|
||
import javax.annotation.Nonnull; | ||
|
||
import java.util.Properties; | ||
|
||
/** | ||
* Configuration for the {@link StandaloneJobClusterEntryPoint}. | ||
*/ | ||
final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration { | ||
@Nonnull | ||
private final String jobClassName; | ||
|
||
public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) { | ||
super(configDir, dynamicProperties, args, restPort); | ||
this.jobClassName = jobClassName; | ||
} | ||
|
||
@Nonnull | ||
String getJobClassName() { | ||
return jobClassName; | ||
} | ||
} |
75 changes: 75 additions & 0 deletions
75
...org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.container.entrypoint; | ||
|
||
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; | ||
|
||
import org.apache.commons.cli.CommandLine; | ||
import org.apache.commons.cli.Option; | ||
import org.apache.commons.cli.Options; | ||
|
||
import javax.annotation.Nonnull; | ||
|
||
import java.util.Properties; | ||
|
||
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; | ||
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; | ||
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION; | ||
|
||
/** | ||
* Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given | ||
* list of command line arguments. | ||
*/ | ||
public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory<StandaloneJobClusterConfiguration> { | ||
|
||
private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") | ||
.longOpt("job-classname") | ||
.required(true) | ||
.hasArg(true) | ||
.argName("job class name") | ||
.desc("Class name of the job to run.") | ||
.build(); | ||
|
||
@Override | ||
public Options getOptions() { | ||
final Options options = new Options(); | ||
options.addOption(CONFIG_DIR_OPTION); | ||
options.addOption(REST_PORT_OPTION); | ||
options.addOption(JOB_CLASS_NAME_OPTION); | ||
options.addOption(DYNAMIC_PROPERTY_OPTION); | ||
|
||
return options; | ||
} | ||
|
||
@Override | ||
public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) { | ||
final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); | ||
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); | ||
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1"); | ||
final int restPort = Integer.parseInt(restPortString); | ||
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); | ||
|
||
return new StandaloneJobClusterConfiguration( | ||
configDir, | ||
dynamicProperties, | ||
commandLine.getArgs(), | ||
restPort, | ||
jobClassName); | ||
} | ||
} |
156 changes: 156 additions & 0 deletions
156
...r/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.container.entrypoint; | ||
|
||
import org.apache.flink.client.program.PackagedProgram; | ||
import org.apache.flink.client.program.PackagedProgramUtils; | ||
import org.apache.flink.client.program.ProgramInvocationException; | ||
import org.apache.flink.configuration.Configuration; | ||
import org.apache.flink.configuration.CoreOptions; | ||
import org.apache.flink.runtime.clusterframework.ApplicationStatus; | ||
import org.apache.flink.runtime.clusterframework.types.ResourceID; | ||
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; | ||
import org.apache.flink.runtime.entrypoint.ClusterInformation; | ||
import org.apache.flink.runtime.entrypoint.FlinkParseException; | ||
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; | ||
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; | ||
import org.apache.flink.runtime.heartbeat.HeartbeatServices; | ||
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; | ||
import org.apache.flink.runtime.jobgraph.JobGraph; | ||
import org.apache.flink.runtime.metrics.MetricRegistry; | ||
import org.apache.flink.runtime.resourcemanager.ResourceManager; | ||
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; | ||
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; | ||
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; | ||
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; | ||
import org.apache.flink.runtime.rpc.FatalErrorHandler; | ||
import org.apache.flink.runtime.rpc.RpcService; | ||
import org.apache.flink.runtime.util.EnvironmentInformation; | ||
import org.apache.flink.runtime.util.JvmShutdownSafeguard; | ||
import org.apache.flink.runtime.util.SignalHandler; | ||
import org.apache.flink.util.FlinkException; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* {@link JobClusterEntrypoint} which is started with a job in a predefined | ||
* location. | ||
*/ | ||
public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { | ||
|
||
private static final String[] EMPTY_ARGS = new String[0]; | ||
|
||
@Nonnull | ||
private final String jobClassName; | ||
|
||
StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) { | ||
super(configuration); | ||
this.jobClassName = jobClassName; | ||
} | ||
|
||
@Override | ||
protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException { | ||
final PackagedProgram packagedProgram = createPackagedProgram(); | ||
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); | ||
try { | ||
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism); | ||
jobGraph.setAllowQueuedScheduling(true); | ||
|
||
return jobGraph; | ||
} catch (Exception e) { | ||
throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e); | ||
} | ||
} | ||
|
||
private PackagedProgram createPackagedProgram() throws FlinkException { | ||
try { | ||
final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName); | ||
return new PackagedProgram(mainClass, EMPTY_ARGS); | ||
} catch (ClassNotFoundException | ProgramInvocationException e) { | ||
throw new FlinkException("Could not load the provied entrypoint class.", e); | ||
} | ||
} | ||
|
||
@Override | ||
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) { | ||
terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true)); | ||
} | ||
|
||
@Override | ||
protected ResourceManager<?> createResourceManager( | ||
Configuration configuration, | ||
ResourceID resourceId, | ||
RpcService rpcService, | ||
HighAvailabilityServices highAvailabilityServices, | ||
HeartbeatServices heartbeatServices, | ||
MetricRegistry metricRegistry, | ||
FatalErrorHandler fatalErrorHandler, | ||
ClusterInformation clusterInformation, | ||
@Nullable String webInterfaceUrl) throws Exception { | ||
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); | ||
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); | ||
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( | ||
resourceManagerRuntimeServicesConfiguration, | ||
highAvailabilityServices, | ||
rpcService.getScheduledExecutor()); | ||
|
||
return new StandaloneResourceManager( | ||
rpcService, | ||
ResourceManager.RESOURCE_MANAGER_NAME, | ||
resourceId, | ||
resourceManagerConfiguration, | ||
highAvailabilityServices, | ||
heartbeatServices, | ||
resourceManagerRuntimeServices.getSlotManager(), | ||
metricRegistry, | ||
resourceManagerRuntimeServices.getJobLeaderIdService(), | ||
clusterInformation, | ||
fatalErrorHandler); | ||
} | ||
|
||
public static void main(String[] args) { | ||
// startup checks and logging | ||
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args); | ||
SignalHandler.register(LOG); | ||
JvmShutdownSafeguard.installAsShutdownHook(LOG); | ||
|
||
final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); | ||
StandaloneJobClusterConfiguration clusterConfiguration = null; | ||
|
||
try { | ||
clusterConfiguration = commandLineParser.parse(args); | ||
} catch (FlinkParseException e) { | ||
LOG.error("Could not parse command line arguments {}.", args, e); | ||
commandLineParser.printHelp(); | ||
System.exit(1); | ||
} | ||
|
||
Configuration configuration = loadConfiguration(clusterConfiguration); | ||
|
||
configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString()); | ||
|
||
StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName()); | ||
|
||
entrypoint.startCluster(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http:https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
|
||
# Convenience file for local debugging of the JobManager/TaskManager. | ||
log4j.rootLogger=OFF, console | ||
log4j.appender.console=org.apache.log4j.ConsoleAppender | ||
log4j.appender.console.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n | ||
|
||
log4j.logger.org.apache.flink.mesos=DEBUG | ||
log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO |
Oops, something went wrong.