Skip to content

Commit

Permalink
[FLINK-16661] Consolidating code in ApplicationClusterEntrypoints
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Apr 30, 2020
1 parent 3c4cd5c commit 7f49ec9
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Base class for cluster entry points targeting executing applications in "Application Mode".
* The lifecycle of the enrtypoint is bound to that of the specific application being executed,
* and the {@code main()} method of the application is run on the cluster.
*/
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {

private final PackagedProgram program;

private final ResourceManagerFactory<?> resourceManagerFactory;

protected ApplicationClusterEntryPoint(
final Configuration configuration,
final PackagedProgram program,
final ResourceManagerFactory<?> resourceManagerFactory) {
super(configuration);
this.program = checkNotNull(program);
this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(final Configuration configuration) {
return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory
.create(configuration, SessionDispatcherFactory.INSTANCE, program)),
resourceManagerFactory,
JobRestEndpointFactory.INSTANCE);
}

@Override
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
final Configuration configuration,
final ScheduledExecutor scheduledExecutor) {
return new MemoryArchivedExecutionGraphStore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
Expand All @@ -30,20 +31,11 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand All @@ -55,41 +47,21 @@
import java.net.URL;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory;

/**
* {@link JobClusterEntrypoint} which is started with a job in a predefined
* An {@link ApplicationClusterEntryPoint} which is started with a job in a predefined
* location.
*/
public final class StandaloneJobClusterEntryPoint extends ClusterEntrypoint {
@Internal
public final class StandaloneJobClusterEntryPoint extends ApplicationClusterEntryPoint {

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

private final PackagedProgram program;

private StandaloneJobClusterEntryPoint(
final Configuration configuration,
final PackagedProgram program) {
super(configuration);
this.program = requireNonNull(program);
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory
.create(configuration, SessionDispatcherFactory.INSTANCE, program)),
StandaloneResourceManagerFactory.getInstance(),
JobRestEndpointFactory.INSTANCE);
}

@Override
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
Configuration configuration,
ScheduledExecutor scheduledExecutor) {
return new MemoryArchivedExecutionGraphStore();
super(configuration, program, StandaloneResourceManagerFactory.getInstance());
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
Expand All @@ -31,16 +31,8 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand All @@ -59,53 +51,22 @@
import java.util.Optional;

import static org.apache.flink.runtime.util.ClusterEntrypointUtils.tryFindUserLibDirectory;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* {@link ClusterEntrypoint} whose lifecycle is bound to a specific application,
* and runs the main method of the application on the cluster -- Application Mode.
*
* <p>NOTE TO SELF: We have to create a base ApplicationClusterEntryPoint as soon as
* we are sure about the correctness of the abstractions. This class shares a lot
* of code with the StandaloneJobClusterEntryPoint.
* An {@link ApplicationClusterEntryPoint} for Yarn.
*/
@Internal
public class YarnApplicationClusterEntryPoint extends ClusterEntrypoint {
public final class YarnApplicationClusterEntryPoint extends ApplicationClusterEntryPoint {

public static final JobID ZERO_JOB_ID = new JobID(0, 0);

private final PackagedProgram program;

public YarnApplicationClusterEntryPoint(
private YarnApplicationClusterEntryPoint(
final Configuration configuration,
final PackagedProgram program) {
super(configuration);
this.program = checkNotNull(program);
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(final Configuration configuration) {
return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory
.create(configuration, SessionDispatcherFactory.INSTANCE, program)),
YarnResourceManagerFactory.getInstance(),
JobRestEndpointFactory.INSTANCE);
super(configuration, program, YarnResourceManagerFactory.getInstance());
}

@Override
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
final Configuration configuration,
final ScheduledExecutor scheduledExecutor) {
return new MemoryArchivedExecutionGraphStore();
}

// ------------------------------------------------------------------------
// The executable entry point for the Yarn Application Master Process
// for a single Flink Application.
// ------------------------------------------------------------------------

public static void main(final String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);
Expand All @@ -127,11 +88,10 @@ public static void main(final String[] args) {
}

final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env);
final ApplicationConfiguration applicationConfiguration = ApplicationConfiguration.fromConfiguration(configuration);

PackagedProgram program = null;
try {
program = getPackagedProgram(configuration, applicationConfiguration);
program = getPackagedProgram(configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
Expand All @@ -150,9 +110,11 @@ public static void main(final String[] args) {
ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
}

private static PackagedProgram getPackagedProgram(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration) throws IOException, FlinkException {
private static PackagedProgram getPackagedProgram(final Configuration configuration) throws IOException, FlinkException {

final ApplicationConfiguration applicationConfiguration =
ApplicationConfiguration.fromConfiguration(configuration);

final PackagedProgramRetriever programRetriever = getPackagedProgramRetriever(
configuration,
applicationConfiguration.getProgramArguments(),
Expand Down

0 comments on commit 7f49ec9

Please sign in to comment.