Skip to content

Commit

Permalink
[hotfix] Remove DispatcherProcess#getJobManagerPort function
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Sep 28, 2018
1 parent 6ace721 commit b0d5e99
Showing 1 changed file with 1 addition and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@

import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -48,9 +43,6 @@ public class DispatcherProcess extends TestJvmProcess {

private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);

/** Pattern to parse the job manager port from the logs. */
private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp:https://flink@.*:(\\d+).*");

/** ID for this JobManager. */
private final int id;

Expand All @@ -60,9 +52,6 @@ public class DispatcherProcess extends TestJvmProcess {
/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */
private final String[] jvmArgs;

/** The port the JobManager listens on. */
private int jobManagerPort;

/**
* Creates a {@link JobManager} running in a separate JVM.
*
Expand Down Expand Up @@ -106,41 +95,9 @@ public Configuration getConfig() {
return config;
}

/**
* Parses the port from the job manager logs and returns it.
*
* <p>If a call to this method succeeds, successive calls will directly
* return the port and re-parse the logs.
*
* @param timeout Timeout for log parsing.
* @return The port of the job manager
* @throws InterruptedException If interrupted while waiting before
* retrying to parse the logs
* @throws NumberFormatException If the parsed port is not a number
*/
public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
if (jobManagerPort > 0) {
return jobManagerPort;
} else {
Deadline deadline = timeout.fromNow();
while (deadline.hasTimeLeft()) {
Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
if (matcher.find()) {
String port = matcher.group(1);
jobManagerPort = Integer.parseInt(port);
return jobManagerPort;
} else {
Thread.sleep(100);
}
}

throw new RuntimeException("Could not parse port from logs");
}
}

@Override
public String toString() {
return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort);
return String.format("JobManagerProcess(id=%d)", id);
}

/**
Expand Down

0 comments on commit b0d5e99

Please sign in to comment.