Skip to content

Commit

Permalink
[FLINK-8776][flip6] Use correct port for job submission from Web UI.
Browse files Browse the repository at this point in the history
Use address of local WebMonitorEndpoint for the job submission from the Web UI.
Rename TestingLeaderRetrievalService to SettableLeaderRetrievalService and move
class out of test directory.

This closes apache#5577.
  • Loading branch information
GJL authored and tillrohrmann committed Feb 26, 2018
1 parent f9a583b commit bbb6353
Show file tree
Hide file tree
Showing 25 changed files with 211 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,29 @@ public RestClusterClient(Configuration config, T clusterId) throws Exception {
config,
null,
clusterId,
new ExponentialWaitStrategy(10L, 2000L));
new ExponentialWaitStrategy(10L, 2000L),
null);
}

public RestClusterClient(
Configuration config,
T clusterId,
LeaderRetrievalService webMonitorRetrievalService) throws Exception {
this(
config,
null,
clusterId,
new ExponentialWaitStrategy(10L, 2000L),
webMonitorRetrievalService);
}

@VisibleForTesting
RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
@Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception {
super(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);

Expand All @@ -159,7 +177,11 @@ public RestClusterClient(Configuration config, T clusterId) throws Exception {
this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
this.clusterId = Preconditions.checkNotNull(clusterId);

this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
if (webMonitorRetrievalService == null) {
this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
} else {
this.webMonitorRetrievalService = webMonitorRetrievalService;
}
this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
startLeaderRetrievers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -136,9 +136,9 @@ public void testJobManagerRetrievalWithHAServices() throws Exception {

final String expectedAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);

final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(expectedAddress, leaderId);
final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(expectedAddress, leaderId);

highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, settableLeaderRetrievalService);

StandaloneClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ public void setUp() throws Exception {
}
}
};
restClusterClient = new RestClusterClient<>(config, restClient, StandaloneClusterId.getInstance(), (attempt) -> 0);
restClusterClient = new RestClusterClient<>(
config,
restClient,
StandaloneClusterId.getInstance(),
(attempt) -> 0,
null);

jobGraph = new JobGraph("testjob");
jobId = jobGraph.getJobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -213,7 +213,7 @@ public Context() {

highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
new TestingLeaderRetrievalService(
new SettableLeaderRetrievalService(
jobManager.path(),
HighAvailabilityServices.DEFAULT_LEADER_ID));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
Expand Down Expand Up @@ -412,15 +412,15 @@ class MockJobMaster {
public final String address;
public final JobMasterGateway gateway;
public final JobMasterId jobMasterId;
public final TestingLeaderRetrievalService leaderRetrievalService;
public final SettableLeaderRetrievalService leaderRetrievalService;

MockJobMaster(JobID jobID) {
this.jobID = jobID;
this.resourceID = new ResourceID(jobID.toString());
this.address = "/" + jobID;
this.gateway = mock(JobMasterGateway.class);
this.jobMasterId = JobMasterId.generate();
this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.jobMasterId.toUUID());
this.leaderRetrievalService = new SettableLeaderRetrievalService(this.address, this.jobMasterId.toUUID());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
Expand Down Expand Up @@ -62,7 +64,15 @@ public WebSubmissionExtension(
Executor executor,
Time timeout) throws Exception {

restClusterClient = new RestClusterClient<>(configuration, "WebSubmissionHandlers");
final SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
restAddressFuture.thenAccept(restAddress -> settableLeaderRetrievalService.notifyListener(
restAddress,
HighAvailabilityServices.DEFAULT_LEADER_ID));

restClusterClient = new RestClusterClient<>(
configuration,
"WebSubmissionHandlers",
settableLeaderRetrievalService);

webSubmissionHandlers = new ArrayList<>(3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,38 @@
* limitations under the License.
*/

package org.apache.flink.runtime.leaderelection;
package org.apache.flink.runtime.leaderretrieval;

import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.UUID;

/**
* Test {@link LeaderRetrievalService} implementation which directly forwards calls of
* {@link LeaderRetrievalService} implementation which directly forwards calls of
* notifyListener to the listener.
*/
public class TestingLeaderRetrievalService implements LeaderRetrievalService {
public class SettableLeaderRetrievalService implements LeaderRetrievalService {

private volatile String leaderAddress;
private volatile UUID leaderSessionID;
private String leaderAddress;
private UUID leaderSessionID;

private volatile LeaderRetrievalListener listener;
private LeaderRetrievalListener listener;

public TestingLeaderRetrievalService() {
public SettableLeaderRetrievalService() {
this(null, null);
}

public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) {
public SettableLeaderRetrievalService(
@Nullable String leaderAddress,
@Nullable UUID leaderSessionID) {
this.leaderAddress = leaderAddress;
this.leaderSessionID = leaderSessionID;
}

@Override
public void start(LeaderRetrievalListener listener) throws Exception {
public synchronized void start(LeaderRetrievalListener listener) throws Exception {
this.listener = Preconditions.checkNotNull(listener);

if (leaderSessionID != null && leaderAddress != null) {
Expand All @@ -58,7 +60,9 @@ public void stop() throws Exception {

}

public void notifyListener(String address, UUID leaderSessionID) {
public synchronized void notifyListener(
@Nullable String address,
@Nullable UUID leaderSessionID) {
this.leaderAddress = address;
this.leaderSessionID = leaderSessionID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
Expand Down Expand Up @@ -85,13 +85,13 @@ public void testSubmissionTimeout() throws Exception {
PlainActor.class,
leaderSessionID));

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
jobManager.path().toString(),
leaderSessionID
);

Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false,
clientConfig);
Expand Down Expand Up @@ -124,13 +124,13 @@ public void testRegistrationTimeout() throws Exception {
PlainActor.class,
leaderSessionID));

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
jobManager.path().toString(),
leaderSessionID
);

Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false);

Expand All @@ -154,12 +154,12 @@ public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Excepti
FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
FiniteDuration timeout = jobClientActorTimeout.$times(2);

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);

Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false,
clientConfig);
Expand All @@ -183,12 +183,12 @@ public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Excep
FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
FiniteDuration timeout = jobClientActorTimeout.$times(2);

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);

Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false);

Expand Down Expand Up @@ -219,13 +219,13 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception {
JobAcceptingActor.class,
leaderSessionID));

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
jobManager.path().toString(),
leaderSessionID
);

Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false,
clientConfig);
Expand Down Expand Up @@ -261,13 +261,13 @@ public void testConnectionTimeoutAfterJobRegistration() throws Exception {
JobAcceptingActor.class,
leaderSessionID));

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
jobManager.path().toString(),
leaderSessionID
);

Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
testingLeaderRetrievalService,
settableLeaderRetrievalService,
jobClientActorTimeout,
false);

Expand Down Expand Up @@ -302,13 +302,13 @@ public void testGuaranteedAnswerIfJobClientDies() throws Exception {
JobAcceptingActor.class,
leaderSessionID));

TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(
jobManager.path().toString(),
leaderSessionID
);

TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, settableLeaderRetrievalService);

JobListeningContext jobListeningContext =
JobClient.submitJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
Expand Down Expand Up @@ -107,7 +107,7 @@ public class ResourceManagerTest extends TestLogger {
private final Time timeout = Time.seconds(10L);

private TestingHighAvailabilityServices highAvailabilityServices;
private TestingLeaderRetrievalService jobManagerLeaderRetrievalService;
private SettableLeaderRetrievalService jobManagerLeaderRetrievalService;

@BeforeClass
public static void setup() {
Expand All @@ -121,7 +121,7 @@ public static void teardown() {

@Before
public void setupTest() {
jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
jobManagerLeaderRetrievalService = new SettableLeaderRetrievalService();

highAvailabilityServices = new TestingHighAvailabilityServices();

Expand Down Expand Up @@ -602,7 +602,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
Time.seconds(5L));

final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
final SettableLeaderRetrievalService jmLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.MetricRegistry;
Expand Down Expand Up @@ -161,7 +161,7 @@ public void setUp() throws Exception {
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService());
haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
runningJobsRegistry = haServices.getRunningJobsRegistry();

final Configuration blobServerConfig = new Configuration();
Expand Down
Loading

0 comments on commit bbb6353

Please sign in to comment.