diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 1326ff13254e3..0ef54b2d18037 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -74,7 +74,6 @@ import javax.annotation.Nullable; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -294,20 +293,15 @@ public boolean getPrintStatusDuringExecution() { } /** - * Gets the current JobManager address (may change in case of a HA setup). - * @return The address (host and port) of the leading JobManager + * Gets the current cluster connection info (may change in case of a HA setup). + * + * @return The the connection info to the leader component of the cluster + * @throws LeaderRetrievalException if the leader could not be retrieved */ - public InetSocketAddress getJobManagerAddress() { - try { - LeaderConnectionInfo leaderConnectionInfo = - LeaderRetrievalUtils.retrieveLeaderConnectionInfo( - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - timeout); - - return AkkaUtils.getInetSocketAddressFromAkkaURL(leaderConnectionInfo.getAddress()); - } catch (Exception e) { - throw new RuntimeException("Failed to retrieve JobManager address", e); - } + public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException { + return LeaderRetrievalUtils.retrieveLeaderConnectionInfo( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + timeout); } // ------------------------------------------------------------------------ diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index df08c3092762d..0b91ed47cc986 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -22,12 +22,14 @@ import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import java.net.InetSocketAddress; import java.net.URL; import java.util.Collections; import java.util.List; @@ -54,7 +56,15 @@ public void waitForClusterToBeReady() {} @Override public String getWebInterfaceURL() { - String host = getJobManagerAddress().getHostString(); + final InetSocketAddress inetSocketAddressFromAkkaURL; + + try { + inetSocketAddressFromAkkaURL = AkkaUtils.getInetSocketAddressFromAkkaURL(getClusterConnectionInfo().getAddress()); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve leader retrieval information.", e); + } + + String host = inetSocketAddressFromAkkaURL.getHostName(); int port = getFlinkConfiguration().getInteger(WebOptions.PORT); return "http://" + host + ":" + port; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 141af717d0347..564990fdf6349 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; @@ -70,6 +71,8 @@ import org.apache.flink.runtime.rest.messages.queue.QueueStatus; import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; @@ -365,6 +368,13 @@ public T getClusterId() { return clusterId; } + @Override + public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException { + return LeaderRetrievalUtils.retrieveLeaderConnectionInfo( + highAvailabilityServices.getDispatcherLeaderRetriever(), + timeout); + } + /** * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java index aaca798e5f657..d89e988aebb6d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java @@ -22,15 +22,16 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.util.TestLogger; import org.apache.commons.cli.CommandLine; -import org.junit.Assert; +import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.net.InetSocketAddress; +import static org.junit.Assert.assertThat; /** * Tests for the {@link DefaultCLI}. @@ -60,13 +61,14 @@ public void testConfigurationPassing() throws Exception { CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false); - final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port); - final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine); final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine)); - Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress()); + final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo(); + + assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(localhost)); + assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(port)); } /** @@ -93,9 +95,10 @@ public void testManualConfigurationOverride() throws Exception { final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine)); - final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort); + final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo(); - Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress()); + assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(manualHostname)); + assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(manualPort)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java index 8d2a9b5cfd4f9..ff1c5fdb799fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalException.java @@ -18,11 +18,13 @@ package org.apache.flink.runtime.leaderretrieval; +import org.apache.flink.util.FlinkException; + /** * This exception is thrown by the {@link org.apache.flink.runtime.util.LeaderRetrievalUtils} when * the method retrieveLeaderGateway fails to retrieve the current leader's gateway. */ -public class LeaderRetrievalException extends Exception { +public class LeaderRetrievalException extends FlinkException { private static final long serialVersionUID = 42; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java index aee023a5aa3db..2c94c43826e28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderConnectionInfo.java @@ -18,6 +18,12 @@ package org.apache.flink.runtime.util; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.util.FlinkException; + +import akka.actor.Address; + +import java.net.MalformedURLException; import java.util.UUID; /** @@ -29,9 +35,34 @@ public class LeaderConnectionInfo { private final UUID leaderSessionID; - public LeaderConnectionInfo(String address, UUID leaderSessionID) { + private final String hostname; + + private final int port; + + public LeaderConnectionInfo(String address, UUID leaderSessionID) throws FlinkException { this.address = address; this.leaderSessionID = leaderSessionID; + + final Address akkaAddress; + // this only works as long as the address is Akka based + try { + akkaAddress = AkkaUtils.getAddressFromAkkaURL(address); + } catch (MalformedURLException e) { + throw new FlinkException("Could not extract the hostname from the given address \'" + + address + "\'.", e); + } + + if (akkaAddress.host().isDefined()) { + hostname = akkaAddress.host().get(); + } else { + hostname = "localhost"; + } + + if (akkaAddress.port().isDefined()) { + port = (int) akkaAddress.port().get(); + } else { + port = -1; + } } public String getAddress() { @@ -41,4 +72,20 @@ public String getAddress() { public UUID getLeaderSessionID() { return leaderSessionID; } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + @Override + public String toString() { + return "LeaderConnectionInfo{" + + "address='" + address + '\'' + + ", leaderSessionID=" + leaderSessionID + + '}'; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java index 6b861a377b345..aeaa2b95944ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.util; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -34,16 +30,23 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.util.FlinkException; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; -import java.net.InetAddress; -import java.util.UUID; - /** * Utility class to work with {@link LeaderRetrievalService} class. */ @@ -241,8 +244,14 @@ public Future getLeaderConnectionInfoFuture() { @Override public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { - if(leaderAddress != null && !leaderAddress.equals("") && !connectionInfo.isCompleted()) { - connectionInfo.success(new LeaderConnectionInfo(leaderAddress, leaderSessionID)); + if (leaderAddress != null && !leaderAddress.equals("") && !connectionInfo.isCompleted()) { + try { + final LeaderConnectionInfo leaderConnectionInfo = new LeaderConnectionInfo(leaderAddress, leaderSessionID); + connectionInfo.success(leaderConnectionInfo); + } catch (FlinkException e) { + connectionInfo.failure(e); + } + } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index f4b80697604d5..0c6be5db6eace 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -697,18 +697,12 @@ object AkkaUtils { def getInetSocketAddressFromAkkaURL(akkaURL: String): InetSocketAddress = { // AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL try { - // we need to manually strip the protocol, because "akka.tcp" is not - // a valid protocol for Java's URL class - val protocolonPos = akkaURL.indexOf("://") - if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) { - throw new MalformedURLException() - } - - val url = new URL("http://" + akkaURL.substring(protocolonPos + 3)) - if (url.getHost == null || url.getPort == -1) { - throw new MalformedURLException() + val address = getAddressFromAkkaURL(akkaURL) + + (address.host, address.port) match { + case (Some(hostname), Some(portValue)) => new InetSocketAddress(hostname, portValue) + case _ => throw new MalformedURLException() } - new InetSocketAddress(url.getHost, url.getPort) } catch { case _ : MalformedURLException => @@ -716,6 +710,19 @@ object AkkaUtils { } } + /** + * Extracts the [[Address]] from the given akka URL. + * + * @param akkaURL to extract the [[Address]] from + * @throws java.net.MalformedURLException if the [[Address]] could not be parsed from + * the given akka URL + * @return Extracted [[Address]] from the given akka URL + */ + @throws(classOf[MalformedURLException]) + def getAddressFromAkkaURL(akkaURL: String): Address = { + AddressFromURIString(akkaURL) + } + def formatDurationParingErrorMessage: String = { "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " + "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" + diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 6fa6aa51ab6cc..54ed05ee7b0fe 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -20,10 +20,11 @@ package org.apache.flink.api.scala import java.io._ -import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser, RunOptions} -import org.apache.flink.client.deployment.{ClusterDescriptor, StandaloneClusterId} +import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser} +import org.apache.flink.client.deployment.ClusterDescriptor import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions} +import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.minicluster.StandaloneMiniCluster import scala.collection.mutable.ArrayBuffer @@ -270,8 +271,11 @@ object FlinkShell { val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification) - val address = cluster.getJobManagerAddress.getAddress.getHostAddress - val port = cluster.getJobManagerAddress.getPort + val inetSocketAddress = AkkaUtils.getInetSocketAddressFromAkkaURL( + cluster.getClusterConnectionInfo.getAddress) + + val address = inetSocketAddress.getAddress.getHostAddress + val port = inetSocketAddress.getPort (address, port, Some(Right(cluster))) } @@ -307,7 +311,8 @@ object FlinkShell { throw new RuntimeException("Yarn Cluster could not be retrieved.") } - val jobManager = cluster.getJobManagerAddress + val jobManager = AkkaUtils.getInetSocketAddressFromAkkaURL( + cluster.getClusterConnectionInfo.getAddress) (jobManager.getHostString, jobManager.getPort, None) } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index ec6c1052ae273..975dd28e1d5aa 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -272,7 +272,7 @@ public void testJavaAPI() throws Exception { } // use the cluster - Assert.assertNotNull(yarnCluster.getJobManagerAddress()); + Assert.assertNotNull(yarnCluster.getClusterConnectionInfo()); Assert.assertNotNull(yarnCluster.getWebInterfaceURL()); LOG.info("Shutting down cluster. All tests passed"); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index cc7f4c164696b..3ab8de7311613 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -596,17 +597,32 @@ public int run(String[] args) throws CliArgsException, FlinkException { //------------------ ClusterClient deployed, handle connection details yarnApplicationId = clusterClient.getClusterId(); - String jobManagerAddress = - clusterClient.getJobManagerAddress().getAddress().getHostName() + - ':' + clusterClient.getJobManagerAddress().getPort(); + try { + final LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo(); - System.out.println("Flink JobManager is now running on " + jobManagerAddress); - System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL()); + System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() + + ':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.'); + System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL()); - writeYarnPropertiesFile( - yarnApplicationId, - clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(), - yarnClusterDescriptor.getDynamicPropertiesEncoded()); + writeYarnPropertiesFile( + yarnApplicationId, + clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(), + yarnClusterDescriptor.getDynamicPropertiesEncoded()); + } catch (Exception e) { + try { + clusterClient.shutdown(); + } catch (Exception ex) { + LOG.info("Could not properly shutdown cluster client.", ex); + } + + try { + yarnClusterDescriptor.terminateCluster(yarnApplicationId); + } catch (FlinkException fe) { + LOG.info("Could not properly terminate the Flink cluster.", fe); + } + + throw new FlinkException("Could not write the Yarn connection information.", e); + } } if (detachedMode) {