From 870b56418cb795edba9d4e8e282fa91d675efd7a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sun, 29 Sep 2019 18:06:00 +0200 Subject: [PATCH] [FLINK-14282] Simplify DispatcherResourceManagerComponent hierarchy Remove unnecessary subclasses of AbstractDispatcherResourceManagerComponent and rename this class into DefaultDispatcherResourceManagerComponent. Moreover, this commit removes the unnecessary generics from the DispatcherRunnerFactory to further simplify the code base. This closes #9809. --- .../StandaloneJobClusterEntryPoint.java | 4 +- .../entrypoint/MesosJobClusterEntrypoint.java | 7 +- .../MesosSessionClusterEntrypoint.java | 7 +- .../runner/DispatcherRunnerFactory.java | 6 +- .../runner/MiniDispatcherRunner.java | 42 ------------ .../runner/MiniDispatcherRunnerFactory.java | 2 +- .../runner/MiniDispatcherRunnerImpl.java | 2 +- .../StandaloneDispatcherRunnerFactory.java | 2 +- .../StandaloneSessionClusterEntrypoint.java | 7 +- ...tcherResourceManagerComponentFactory.java} | 40 ++++++++---- ...JobDispatcherResourceManagerComponent.java | 54 --------------- ...atcherResourceManagerComponentFactory.java | 55 ---------------- ...atcherResourceManagerComponentFactory.java | 65 ------------------- .../runtime/minicluster/MiniCluster.java | 9 +-- ...tcherResourceManagerComponentFactory.java} | 28 ++++---- .../minicluster/TestingMiniCluster.java | 9 +-- .../ProcessFailureCancelingITCase.java | 5 +- .../entrypoint/YarnJobClusterEntrypoint.java | 7 +- .../YarnSessionClusterEntrypoint.java | 4 +- 19 files changed, 75 insertions(+), 280 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunner.java rename flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/{AbstractDispatcherResourceManagerComponentFactory.java => DefaultDispatcherResourceManagerComponentFactory.java} (86%) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java rename flink-runtime/src/{main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java => test/java/org/apache/flink/runtime/entrypoint/component/TestingDefaultDispatcherResourceManagerComponentFactory.java} (51%) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index fbaba809f5daa..8935f358d5237 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; -import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -75,7 +75,7 @@ private StandaloneJobClusterEntryPoint( @Override protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new JobDispatcherResourceManagerComponentFactory( + return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( StandaloneResourceManagerFactory.INSTANCE, new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index 8bbd6c6f22647..6c1f486ba9b4c 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -30,9 +30,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; -import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; @@ -109,8 +108,8 @@ protected CompletableFuture stopClusterServices(boolean cleanupHaData) { } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new JobDispatcherResourceManagerComponentFactory( + protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( new MesosResourceManagerFactory( mesosServices, schedulerConfiguration, diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index afd53c816815e..4033d7d83d5eb 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -30,8 +30,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; @@ -108,8 +107,8 @@ protected CompletableFuture stopClusterServices(boolean cleanupHaData) { } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new SessionDispatcherResourceManagerComponentFactory( + protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( new MesosResourceManagerFactory( mesosServices, mesosConfig, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java index 67787f04cfe76..9212ea871fedd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java @@ -23,10 +23,8 @@ /** * Factory interface for the {@link DispatcherRunner}. - * - * @param type of the dispatcher runner being created */ -public interface DispatcherRunnerFactory { +public interface DispatcherRunnerFactory { - T createDispatcherRunner(RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception; + DispatcherRunner createDispatcherRunner(RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunner.java deleted file mode 100644 index 6aa8531f744ab..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunner.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher.runner; - -import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.dispatcher.MiniDispatcher; - -import java.util.concurrent.CompletableFuture; - -/** - * Interface for a {@link DispatcherRunner} which runs a {@link MiniDispatcher}. - */ -public interface MiniDispatcherRunner extends DispatcherRunner { - - @Override - MiniDispatcher getDispatcher(); - - /** - * Return shut down future of this runner. The shut down future is being - * completed with the final {@link ApplicationStatus} once the runner wants - * to shut down. - * - * @return future with the final application status - */ - CompletableFuture getShutDownFuture(); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerFactory.java index 58a1c76fbfd79..48ef6744615e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerFactory.java @@ -26,7 +26,7 @@ /** * Factory for the {@link MiniDispatcherRunnerImpl}. */ -public class MiniDispatcherRunnerFactory implements DispatcherRunnerFactory { +public class MiniDispatcherRunnerFactory implements DispatcherRunnerFactory { private final JobGraphRetriever jobGraphRetriever; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerImpl.java index ff28b3f769b48..0fa25ae280378 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/MiniDispatcherRunnerImpl.java @@ -29,7 +29,7 @@ /** * Runner which runs a {@link MiniDispatcher} implementation. */ -public class MiniDispatcherRunnerImpl extends DispatcherRunnerImpl implements MiniDispatcherRunner { +class MiniDispatcherRunnerImpl extends DispatcherRunnerImpl { MiniDispatcherRunnerImpl( DispatcherFactory dispatcherFactory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StandaloneDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StandaloneDispatcherRunnerFactory.java index d7099f58314d7..4560c2335c598 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StandaloneDispatcherRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/StandaloneDispatcherRunnerFactory.java @@ -26,7 +26,7 @@ /** * Factory which creates a {@link DispatcherRunnerImpl} which runs a {@link StandaloneDispatcher}. */ -public class StandaloneDispatcherRunnerFactory implements DispatcherRunnerFactory> { +public class StandaloneDispatcherRunnerFactory implements DispatcherRunnerFactory { private final DispatcherFactory dispatcherFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java index 1918a516d41ff..e549ca9c5c181 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java @@ -19,8 +19,7 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -37,8 +36,8 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) { } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); + protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } public static void main(String[] args) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java similarity index 86% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index af47c0cc80c81..6c7aa789372e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -30,8 +30,11 @@ import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; +import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.MiniDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.StandaloneDispatcherRunnerFactory; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -44,7 +47,9 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; import org.apache.flink.runtime.rest.RestEndpointFactory; +import org.apache.flink.runtime.rest.SessionRestEndpointFactory; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher; @@ -72,15 +77,14 @@ /** * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components. * - * @param type of the {@link DispatcherRunner} * @param type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint} */ -public abstract class AbstractDispatcherResourceManagerComponentFactory implements DispatcherResourceManagerComponentFactory { +public class DefaultDispatcherResourceManagerComponentFactory implements DispatcherResourceManagerComponentFactory { private final Logger log = LoggerFactory.getLogger(getClass()); @Nonnull - private final DispatcherRunnerFactory dispatcherRunnerFactory; + private final DispatcherRunnerFactory dispatcherRunnerFactory; @Nonnull private final ResourceManagerFactory resourceManagerFactory; @@ -88,8 +92,8 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory restEndpointFactory; - public AbstractDispatcherResourceManagerComponentFactory( - @Nonnull DispatcherRunnerFactory dispatcherRunnerFactory, + DefaultDispatcherResourceManagerComponentFactory( + @Nonnull DispatcherRunnerFactory dispatcherRunnerFactory, @Nonnull ResourceManagerFactory resourceManagerFactory, @Nonnull RestEndpointFactory restEndpointFactory) { this.dispatcherRunnerFactory = dispatcherRunnerFactory; @@ -115,7 +119,7 @@ public DispatcherResourceManagerComponent create( ResourceManager resourceManager = null; JobManagerMetricGroup jobManagerMetricGroup = null; ResourceManagerMetricGroup resourceManagerMetricGroup = null; - T dispatcherRunner = null; + DispatcherRunner dispatcherRunner = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); @@ -206,7 +210,7 @@ public DispatcherResourceManagerComponent create( resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); - return createDispatcherResourceManagerComponent( + return new DispatcherResourceManagerComponent( dispatcherRunner, resourceManager, dispatcherLeaderRetrievalService, @@ -265,10 +269,20 @@ public DispatcherResourceManagerComponent create( } } - protected abstract DispatcherResourceManagerComponent createDispatcherResourceManagerComponent( - T dispatcherRunner, - ResourceManager resourceManager, - LeaderRetrievalService dispatcherLeaderRetrievalService, - LeaderRetrievalService resourceManagerRetrievalService, - WebMonitorEndpoint webMonitorEndpoint); + public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory( + ResourceManagerFactory resourceManagerFactory) { + return new DefaultDispatcherResourceManagerComponentFactory<>( + new StandaloneDispatcherRunnerFactory(SessionDispatcherFactory.INSTANCE), + resourceManagerFactory, + SessionRestEndpointFactory.INSTANCE); + } + + public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory( + ResourceManagerFactory resourceManagerFactory, + JobGraphRetriever jobGraphRetriever) { + return new DefaultDispatcherResourceManagerComponentFactory<>( + new MiniDispatcherRunnerFactory(jobGraphRetriever), + resourceManagerFactory, + JobRestEndpointFactory.INSTANCE); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java deleted file mode 100644 index 27e22ea883a02..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.entrypoint.component; - -import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.dispatcher.MiniDispatcher; -import org.apache.flink.runtime.dispatcher.runner.MiniDispatcherRunner; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; - -import java.util.concurrent.CompletableFuture; - -/** - * {@link DispatcherResourceManagerComponent} for a job cluster. The dispatcher component starts - * a {@link MiniDispatcher}. - */ -class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent { - - JobDispatcherResourceManagerComponent( - MiniDispatcherRunner dispatcherRunner, - ResourceManager resourceManager, - LeaderRetrievalService dispatcherLeaderRetrievalService, - LeaderRetrievalService resourceManagerRetrievalService, - WebMonitorEndpoint webMonitorEndpoint) { - super(dispatcherRunner, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint); - - final CompletableFuture shutDownFuture = getShutDownFuture(); - - dispatcherRunner.getShutDownFuture().whenComplete((applicationStatus, throwable) -> { - if (throwable != null) { - shutDownFuture.completeExceptionally(throwable); - } else { - shutDownFuture.complete(applicationStatus); - } - }); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java deleted file mode 100644 index d7baa29326413..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.entrypoint.component; - -import org.apache.flink.runtime.dispatcher.runner.MiniDispatcherRunner; -import org.apache.flink.runtime.dispatcher.runner.MiniDispatcherRunnerFactory; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; -import org.apache.flink.runtime.rest.JobRestEndpointFactory; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; - -import javax.annotation.Nonnull; - -/** - * {@link DispatcherResourceManagerComponentFactory} for a {@link JobDispatcherResourceManagerComponent}. - */ -public class JobDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory { - - public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory resourceManagerFactory, @Nonnull JobGraphRetriever jobGraphRetriever) { - super(new MiniDispatcherRunnerFactory(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE); - } - - @Override - protected DispatcherResourceManagerComponent createDispatcherResourceManagerComponent( - MiniDispatcherRunner dispatcherRunner, - ResourceManager resourceManager, - LeaderRetrievalService dispatcherLeaderRetrievalService, - LeaderRetrievalService resourceManagerRetrievalService, - WebMonitorEndpoint webMonitorEndpoint) { - return new JobDispatcherResourceManagerComponent( - dispatcherRunner, - resourceManager, - dispatcherLeaderRetrievalService, - resourceManagerRetrievalService, - webMonitorEndpoint); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java deleted file mode 100644 index 40c8e7d0a3d67..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.entrypoint.component; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.dispatcher.DispatcherGateway; -import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; -import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner; -import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory; -import org.apache.flink.runtime.dispatcher.runner.StandaloneDispatcherRunnerFactory; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; -import org.apache.flink.runtime.rest.SessionRestEndpointFactory; -import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; - -import javax.annotation.Nonnull; - -/** - * {@link DispatcherResourceManagerComponentFactory} for a {@link SessionDispatcherResourceManagerComponent}. - */ -public class SessionDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory { - - public SessionDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory resourceManagerFactory) { - this(new StandaloneDispatcherRunnerFactory(SessionDispatcherFactory.INSTANCE), resourceManagerFactory); - } - - @VisibleForTesting - public SessionDispatcherResourceManagerComponentFactory( - @Nonnull DispatcherRunnerFactory dispatcherFactory, - @Nonnull ResourceManagerFactory resourceManagerFactory) { - super(dispatcherFactory, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE); - } - - @Override - protected DispatcherResourceManagerComponent createDispatcherResourceManagerComponent( - DispatcherRunner dispatcherRunner, - ResourceManager resourceManager, - LeaderRetrievalService dispatcherLeaderRetrievalService, - LeaderRetrievalService resourceManagerRetrievalService, - WebMonitorEndpoint webMonitorEndpoint) { - return new SessionDispatcherResourceManagerComponent( - dispatcherRunner, - resourceManager, - dispatcherLeaderRetrievalService, - resourceManagerRetrievalService, - webMonitorEndpoint); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index ca7264aad0534..466269c109993 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -40,8 +40,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -383,7 +384,7 @@ protected Collection createDispatc MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { - SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); + DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); return Collections.singleton( dispatcherResourceManagerComponentFactory.create( configuration, @@ -398,8 +399,8 @@ protected Collection createDispatc } @Nonnull - private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { - return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); + private DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { + return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/TestingDefaultDispatcherResourceManagerComponentFactory.java similarity index 51% rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/TestingDefaultDispatcherResourceManagerComponentFactory.java index 442f05621a005..cc53537865ed0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/TestingDefaultDispatcherResourceManagerComponentFactory.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,21 +18,21 @@ package org.apache.flink.runtime.entrypoint.component; -import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.rest.SessionRestEndpointFactory; /** - * {@link DispatcherResourceManagerComponent} used by session clusters. + * Testing extension of the {@link DefaultDispatcherResourceManagerComponentFactory}. */ -class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent { - SessionDispatcherResourceManagerComponent( - DispatcherRunner dispatcherRunner, - ResourceManager resourceManager, - LeaderRetrievalService dispatcherLeaderRetrievalService, - LeaderRetrievalService resourceManagerRetrievalService, - WebMonitorEndpoint webMonitorEndpoint) { - super(dispatcherRunner, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint); +public class TestingDefaultDispatcherResourceManagerComponentFactory { + public static DispatcherResourceManagerComponentFactory createSessionComponentFactory( + DispatcherRunnerFactory dispatcherRunnerFactory, + ResourceManagerFactory resourceManagerFactory) { + return new DefaultDispatcherResourceManagerComponentFactory<>( + dispatcherRunnerFactory, + resourceManagerFactory, + SessionRestEndpointFactory.INSTANCE); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java index 691a2a7063fcb..a73ef9f88eeac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java @@ -24,7 +24,8 @@ import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.runner.StandaloneDispatcherRunnerFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.TestingDefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -107,7 +108,7 @@ protected Collection createDispatc MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { - SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createTestingDispatcherResourceManagerComponentFactory(); + DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createTestingDispatcherResourceManagerComponentFactory(); final List result = new ArrayList<>(numberDispatcherResourceManagerComponents); @@ -133,8 +134,8 @@ public CompletableFuture getDispatcherGatewayFuture() { return super.getDispatcherGatewayFuture(); } - private SessionDispatcherResourceManagerComponentFactory createTestingDispatcherResourceManagerComponentFactory() { - return new SessionDispatcherResourceManagerComponentFactory( + private DispatcherResourceManagerComponentFactory createTestingDispatcherResourceManagerComponentFactory() { + return TestingDefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( new StandaloneDispatcherRunnerFactory(SessionDispatcherWithUUIDFactory.INSTANCE), StandaloneResourceManagerWithUUIDFactory.INSTANCE); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index f7d9797026abe..11d0c82c74286 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -42,8 +42,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -122,7 +123,7 @@ public void testCancelingOnProcessFailure() throws Exception { final int jobManagerPort = rpcService.getPort(); config.setInteger(JobManagerOptions.PORT, jobManagerPort); - final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory( + final DispatcherResourceManagerComponentFactory resourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( StandaloneResourceManagerFactory.INSTANCE); DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java index 039722615423f..3058ddae46ca9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java @@ -21,9 +21,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; -import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; -import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -62,8 +61,8 @@ protected String getRPCPortRange(Configuration configuration) { } @Override - protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new JobDispatcherResourceManagerComponentFactory( + protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory( YarnResourceManagerFactory.getInstance(), FileJobGraphRetriever.createFrom(configuration)); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java index b008665bdfb11..bc63e3126e49b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java @@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; -import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -61,7 +61,7 @@ protected String getRPCPortRange(Configuration configuration) { @Override protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { - return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.getInstance()); + return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(YarnResourceManagerFactory.getInstance()); } public static void main(String[] args) {