Skip to content

Commit

Permalink
[FLINK-14282] Simplify DispatcherResourceManagerComponent hierarchy
Browse files Browse the repository at this point in the history
Remove unnecessary subclasses of AbstractDispatcherResourceManagerComponent
and rename this class into DefaultDispatcherResourceManagerComponent. Moreover,
this commit removes the unnecessary generics from the DispatcherRunnerFactory
to further simplify the code base.

This closes apache#9809.
  • Loading branch information
tillrohrmann committed Oct 8, 2019
1 parent 01ad0f6 commit 870b564
Show file tree
Hide file tree
Showing 19 changed files with 75 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
Expand Down Expand Up @@ -75,7 +75,7 @@ private StandaloneJobClusterEntryPoint(

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand Down Expand Up @@ -109,8 +108,8 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
schedulerConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
Expand Down Expand Up @@ -108,8 +107,8 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
mesosConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@

/**
* Factory interface for the {@link DispatcherRunner}.
*
* @param <T> type of the dispatcher runner being created
*/
public interface DispatcherRunnerFactory<T extends DispatcherRunner> {
public interface DispatcherRunnerFactory {

T createDispatcherRunner(RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception;
DispatcherRunner createDispatcherRunner(RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Factory for the {@link MiniDispatcherRunnerImpl}.
*/
public class MiniDispatcherRunnerFactory implements DispatcherRunnerFactory<MiniDispatcherRunnerImpl> {
public class MiniDispatcherRunnerFactory implements DispatcherRunnerFactory {

private final JobGraphRetriever jobGraphRetriever;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Runner which runs a {@link MiniDispatcher} implementation.
*/
public class MiniDispatcherRunnerImpl extends DispatcherRunnerImpl<MiniDispatcher> implements MiniDispatcherRunner {
class MiniDispatcherRunnerImpl extends DispatcherRunnerImpl<MiniDispatcher> {

MiniDispatcherRunnerImpl(
DispatcherFactory<MiniDispatcher> dispatcherFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Factory which creates a {@link DispatcherRunnerImpl} which runs a {@link StandaloneDispatcher}.
*/
public class StandaloneDispatcherRunnerFactory implements DispatcherRunnerFactory<DispatcherRunnerImpl<StandaloneDispatcher>> {
public class StandaloneDispatcherRunnerFactory implements DispatcherRunnerFactory {

private final DispatcherFactory<StandaloneDispatcher> dispatcherFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.flink.runtime.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -37,8 +36,8 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) {
}

@Override
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.MiniDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.StandaloneDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand All @@ -44,7 +47,9 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
Expand Down Expand Up @@ -72,24 +77,23 @@
/**
* Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
*
* @param <T> type of the {@link DispatcherRunner}
* @param <U> type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint}
*/
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends DispatcherRunner, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory {
public class DefaultDispatcherResourceManagerComponentFactory<U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory {

private final Logger log = LoggerFactory.getLogger(getClass());

@Nonnull
private final DispatcherRunnerFactory<? extends T> dispatcherRunnerFactory;
private final DispatcherRunnerFactory dispatcherRunnerFactory;

@Nonnull
private final ResourceManagerFactory<?> resourceManagerFactory;

@Nonnull
private final RestEndpointFactory<U> restEndpointFactory;

public AbstractDispatcherResourceManagerComponentFactory(
@Nonnull DispatcherRunnerFactory<? extends T> dispatcherRunnerFactory,
DefaultDispatcherResourceManagerComponentFactory(
@Nonnull DispatcherRunnerFactory dispatcherRunnerFactory,
@Nonnull ResourceManagerFactory<?> resourceManagerFactory,
@Nonnull RestEndpointFactory<U> restEndpointFactory) {
this.dispatcherRunnerFactory = dispatcherRunnerFactory;
Expand All @@ -115,7 +119,7 @@ public DispatcherResourceManagerComponent create(
ResourceManager<?> resourceManager = null;
JobManagerMetricGroup jobManagerMetricGroup = null;
ResourceManagerMetricGroup resourceManagerMetricGroup = null;
T dispatcherRunner = null;
DispatcherRunner dispatcherRunner = null;

try {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
Expand Down Expand Up @@ -206,7 +210,7 @@ public DispatcherResourceManagerComponent create(
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

return createDispatcherResourceManagerComponent(
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManager,
dispatcherLeaderRetrievalService,
Expand Down Expand Up @@ -265,10 +269,20 @@ public DispatcherResourceManagerComponent create(
}
}

protected abstract DispatcherResourceManagerComponent createDispatcherResourceManagerComponent(
T dispatcherRunner,
ResourceManager<?> resourceManager,
LeaderRetrievalService dispatcherLeaderRetrievalService,
LeaderRetrievalService resourceManagerRetrievalService,
WebMonitorEndpoint<?> webMonitorEndpoint);
public static DefaultDispatcherResourceManagerComponentFactory<DispatcherGateway> createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory) {
return new DefaultDispatcherResourceManagerComponentFactory<>(
new StandaloneDispatcherRunnerFactory(SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
}

public static DefaultDispatcherResourceManagerComponentFactory<RestfulGateway> createJobComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory,
JobGraphRetriever jobGraphRetriever) {
return new DefaultDispatcherResourceManagerComponentFactory<>(
new MiniDispatcherRunnerFactory(jobGraphRetriever),
resourceManagerFactory,
JobRestEndpointFactory.INSTANCE);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 870b564

Please sign in to comment.