Skip to content

Commit

Permalink
[hotfix] Remove unnecessary generics from DispatcherResourceManagerCo…
Browse files Browse the repository at this point in the history
…mponent
  • Loading branch information
tillrohrmann committed Oct 8, 2019
1 parent 76763da commit dd365de
Show file tree
Hide file tree
Showing 18 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private StandaloneJobClusterEntryPoint(
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(
new MesosResourceManagerFactory(
mesosServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
private final AtomicBoolean isShutDown = new AtomicBoolean(false);

@GuardedBy("lock")
private DispatcherResourceManagerComponent<?> clusterComponent;
private DispatcherResourceManagerComponent clusterComponent;

@GuardedBy("lock")
private MetricRegistryImpl metricRegistry;
Expand Down Expand Up @@ -210,7 +210,7 @@ private void runCluster(Configuration configuration) throws Exception {
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
Expand Down Expand Up @@ -475,7 +475,7 @@ private void cleanupDirectories() throws IOException {
// Abstract methods
// --------------------------------------------------

protected abstract DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration);
protected abstract DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration);

protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
Configuration configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) {
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
* @param <T> type of the {@link Dispatcher}
* @param <U> type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint}
*/
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory {

private final Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -98,7 +98,7 @@ public AbstractDispatcherResourceManagerComponentFactory(
}

@Override
public DispatcherResourceManagerComponent<T> create(
public DispatcherResourceManagerComponent create(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand Down Expand Up @@ -266,7 +266,7 @@ public DispatcherResourceManagerComponent<T> create(
}
}

protected abstract DispatcherResourceManagerComponent<T> createDispatcherResourceManagerComponent(
protected abstract DispatcherResourceManagerComponent createDispatcherResourceManagerComponent(
T dispatcher,
ResourceManager<?> resourceManager,
LeaderRetrievalService dispatcherLeaderRetrievalService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
* Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
* in the same process.
*/
public class DispatcherResourceManagerComponent<T extends Dispatcher> implements AutoCloseableAsync {
public class DispatcherResourceManagerComponent implements AutoCloseableAsync {

@Nonnull
private final T dispatcher;
private final Dispatcher dispatcher;

@Nonnull
private final ResourceManager<?> resourceManager;
Expand All @@ -64,7 +64,7 @@ public class DispatcherResourceManagerComponent<T extends Dispatcher> implements
private final AtomicBoolean isRunning = new AtomicBoolean(true);

DispatcherResourceManagerComponent(
@Nonnull T dispatcher,
@Nonnull Dispatcher dispatcher,
@Nonnull ResourceManager<?> resourceManager,
@Nonnull LeaderRetrievalService dispatcherLeaderRetrievalService,
@Nonnull LeaderRetrievalService resourceManagerRetrievalService,
Expand Down Expand Up @@ -107,7 +107,7 @@ public final CompletableFuture<ApplicationStatus> getShutDownFuture() {
}

@Nonnull
public T getDispatcher() {
public Dispatcher getDispatcher() {
return dispatcher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
Expand All @@ -32,9 +31,9 @@
/**
* Factory for the {@link DispatcherResourceManagerComponent}.
*/
public interface DispatcherResourceManagerComponentFactory<T extends Dispatcher> {
public interface DispatcherResourceManagerComponentFactory {

DispatcherResourceManagerComponent<T> create(
DispatcherResourceManagerComponent create(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* {@link DispatcherResourceManagerComponent} for a job cluster. The dispatcher component starts
* a {@link MiniDispatcher}.
*/
class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<MiniDispatcher> {
class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent {

JobDispatcherResourceManagerComponent(
MiniDispatcher dispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFact
}

@Override
protected DispatcherResourceManagerComponent<MiniDispatcher> createDispatcherResourceManagerComponent(
protected DispatcherResourceManagerComponent createDispatcherResourceManagerComponent(
MiniDispatcher dispatcher,
ResourceManager<?> resourceManager,
LeaderRetrievalService dispatcherLeaderRetrievalService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* {@link DispatcherResourceManagerComponent} used by session clusters.
*/
class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<Dispatcher> {
class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent {
SessionDispatcherResourceManagerComponent(
Dispatcher dispatcher,
ResourceManager<?> resourceManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SessionDispatcherResourceManagerComponentFactory(
}

@Override
protected DispatcherResourceManagerComponent<Dispatcher> createDispatcherResourceManagerComponent(
protected DispatcherResourceManagerComponent createDispatcherResourceManagerComponent(
Dispatcher dispatcher,
ResourceManager<?> resourceManager,
LeaderRetrievalService dispatcherLeaderRetrievalService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
private LeaderRetrievalService clusterRestEndpointLeaderRetrievalService;

@GuardedBy("lock")
private Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents;
private Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents;

@GuardedBy("lock")
private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;
Expand Down Expand Up @@ -228,7 +228,7 @@ public HighAvailabilityServices getHighAvailabilityServices() {

@VisibleForTesting
@Nonnull
protected Collection<DispatcherResourceManagerComponent<?>> getDispatcherResourceManagerComponents() {
protected Collection<DispatcherResourceManagerComponent> getDispatcherResourceManagerComponents() {
synchronized (lock) {
return Collections.unmodifiableCollection(dispatcherResourceManagerComponents);
}
Expand Down Expand Up @@ -374,7 +374,7 @@ public void start() throws Exception {
}

@VisibleForTesting
protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(
Configuration configuration,
RpcServiceFactory rpcServiceFactory,
HighAvailabilityServices haServices,
Expand Down Expand Up @@ -755,7 +755,7 @@ private CompletableFuture<Void> shutDownResourceManagerComponents() {

final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(dispatcherResourceManagerComponents.size());

for (DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
terminationFutures.add(dispatcherResourceManagerComponent.closeAsync());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public TestingMiniCluster(TestingMiniClusterConfiguration miniClusterConfigurati

@Nonnull
@Override
public Collection<DispatcherResourceManagerComponent<?>> getDispatcherResourceManagerComponents() {
public Collection<DispatcherResourceManagerComponent> getDispatcherResourceManagerComponents() {
return super.getDispatcherResourceManagerComponents();
}

Expand Down Expand Up @@ -97,7 +97,7 @@ protected HighAvailabilityServices createHighAvailabilityServices(Configuration
}

@Override
protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(
Configuration configuration,
RpcServiceFactory rpcServiceFactory,
HighAvailabilityServices haServices,
Expand All @@ -108,7 +108,7 @@ protected Collection<? extends DispatcherResourceManagerComponent<?>> createDisp
FatalErrorHandler fatalErrorHandler) throws Exception {
SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createTestingDispatcherResourceManagerComponentFactory();

final List<DispatcherResourceManagerComponent<?>> result = new ArrayList<>(numberDispatcherResourceManagerComponents);
final List<DispatcherResourceManagerComponent> result = new ArrayList<>(numberDispatcherResourceManagerComponents);

for (int i = 0; i < numberDispatcherResourceManagerComponents; i++) {
result.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testCancelingOnProcessFailure() throws Exception {

final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE);
DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null;

final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ public void testJobExecutionOnClusterWithLeaderChange() throws Exception {

miniCluster.submitJob(jobGraph).get();

Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents = miniCluster.getDispatcherResourceManagerComponents();
Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents = miniCluster.getDispatcherResourceManagerComponents();

final NewLeaderRetriever newLeaderRetriever = new NewLeaderRetriever();
final HighAvailabilityServices highAvailabilityServices = miniCluster.getHighAvailabilityServices();
dispatcherLeaderRetriever = highAvailabilityServices.getDispatcherLeaderRetriever();
dispatcherLeaderRetriever.start(newLeaderRetriever);

for (int i = 0; i < numDispatchers - 1; i++) {
final DispatcherResourceManagerComponent<?> leadingDispatcherResourceManagerComponent = getLeadingDispatcherResourceManagerComponent(
final DispatcherResourceManagerComponent leadingDispatcherResourceManagerComponent = getLeadingDispatcherResourceManagerComponent(
dispatcherResourceManagerComponents,
newLeaderRetriever);

Expand All @@ -142,7 +142,7 @@ public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
leadingDispatcherResourceManagerComponent.closeAsync();
}

final DispatcherResourceManagerComponent<?> leadingDispatcherResourceManagerComponent = getLeadingDispatcherResourceManagerComponent(
final DispatcherResourceManagerComponent leadingDispatcherResourceManagerComponent = getLeadingDispatcherResourceManagerComponent(
dispatcherResourceManagerComponents,
newLeaderRetriever);

Expand All @@ -158,8 +158,8 @@ public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
}

@Nonnull
protected DispatcherResourceManagerComponent<?> getLeadingDispatcherResourceManagerComponent(
Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents,
protected DispatcherResourceManagerComponent getLeadingDispatcherResourceManagerComponent(
Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents,
NewLeaderRetriever newLeaderRetriever) throws Exception {
final Tuple2<String, UUID> leaderInformation = newLeaderRetriever.waitUntilNewLeader().get();

Expand All @@ -171,8 +171,8 @@ protected DispatcherResourceManagerComponent<?> getLeadingDispatcherResourceMana
}

@Nonnull
private static Optional<DispatcherResourceManagerComponent<?>> findLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents, String address) {
for (DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
private static Optional<DispatcherResourceManagerComponent> findLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents, String address) {
for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : dispatcherResourceManagerComponents) {
if (dispatcherResourceManagerComponent.getDispatcher().getAddress().equals(address)) {
return Optional.of(dispatcherResourceManagerComponent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected String getRPCPortRange(Configuration configuration) {
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
YarnResourceManagerFactory.getInstance(),
FileJobGraphRetriever.createFrom(configuration));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected String getRPCPortRange(Configuration configuration) {
}

@Override
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.getInstance());
}

Expand Down

0 comments on commit dd365de

Please sign in to comment.