Skip to content

Commit

Permalink
[FLINK-10411] Introduce DispatcherResourceManagerComponentFactory
Browse files Browse the repository at this point in the history
This commit introduces the DispatcherResourceManagerComponentFactory which is used
to create a DispatcherResourceManagerComponent. That way, it is possible to eagerly
initialize all fields of the DispatcherResourceManagerComponent making it possible
to make all fields final and remove the lock.

This closes apache#6743.
  • Loading branch information
tillrohrmann committed Sep 28, 2018
1 parent 54c1b19 commit 9f5fd07
Show file tree
Hide file tree
Showing 19 changed files with 522 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.container.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.ClusterComponent;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.JobClusterComponent;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
Expand Down Expand Up @@ -61,8 +61,8 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
}

@Override
protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
return new JobClusterComponent(
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterComponent;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.JobClusterComponent;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand Down Expand Up @@ -109,8 +109,8 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
return new JobClusterComponent(
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
schedulerConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterComponent;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand Down Expand Up @@ -108,12 +108,13 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
return new SessionClusterComponent(new MesosResourceManagerFactory(
mesosServices,
mesosConfig,
taskManagerParameters,
taskManagerContainerSpec));
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
mesosConfig,
taskManagerParameters,
taskManagerContainerSpec));
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand Down Expand Up @@ -109,7 +111,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
private final AtomicBoolean isShutDown = new AtomicBoolean(false);

@GuardedBy("lock")
private ClusterComponent<?> clusterComponent;
private DispatcherResourceManagerComponent<?> clusterComponent;

@GuardedBy("lock")
private MetricRegistryImpl metricRegistry;
Expand Down Expand Up @@ -204,9 +206,9 @@ private void runCluster(Configuration configuration) throws Exception {
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

clusterComponent = createClusterComponent(configuration);
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

clusterComponent.startComponent(
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
Expand Down Expand Up @@ -460,7 +462,7 @@ private void cleanupDirectories() throws IOException {
// Abstract methods
// --------------------------------------------------

protected abstract ClusterComponent<?> createClusterComponent(Configuration configuration);
protected abstract DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration);

protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
Configuration configuration,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -35,8 +37,8 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) {
}

@Override
protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
}

public static void main(String[] args) {
Expand Down
Loading

0 comments on commit 9f5fd07

Please sign in to comment.