Skip to content

Commit

Permalink
[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway
Browse files Browse the repository at this point in the history
This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.

Add time unit for web.timeout

This closes apache#4492.
  • Loading branch information
tillrohrmann committed Aug 11, 2017
1 parent 00d5b62 commit 9f790d3
Show file tree
Hide file tree
Showing 82 changed files with 1,551 additions and 1,018 deletions.
2 changes: 2 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default values are sufficient wh

- `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.

- `web.timeout`: Timeout for asynchronous operation executed by the web frontend in milliseconds (DEFAULT: `10000`, 10 s)

### File Systems

The parameters define the behavior of tasks that create result files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ public class WebOptions {
.defaultValue(50)
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");

/**
* Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
*/
public static final ConfigOption<Long> TIMEOUT = ConfigOptions
.key("web.timeout")
.defaultValue(10L * 1000L);


private WebOptions() {
throw new IllegalAccessError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.mesos.runtime.clusterframework;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
Expand Down Expand Up @@ -52,6 +54,8 @@
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -319,11 +323,16 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
// 2: the web monitor
LOG.debug("Starting Web Frontend");

Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));

webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
actorSystem,
jobManager,
new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
webMonitorTimeout,
futureExecutor,
AkkaUtils.getAkkaURL(actorSystem, jobManager),
LOG);
if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.WeakHashMap;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -48,58 +47,44 @@ public class ExecutionGraphHolder {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);

private final FiniteDuration timeout;
private final Time timeout;

private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();

public ExecutionGraphHolder() {
this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}

public ExecutionGraphHolder(FiniteDuration timeout) {
public ExecutionGraphHolder(Time timeout) {
this.timeout = checkNotNull(timeout);
}

/**
* Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
* Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
* {@link Optional#empty()} if it cannot be found.
*
* @param jid jobID of the execution graph to be retrieved
* @return the retrieved execution graph or null if it is not retrievable
* @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
* @throws Exception if the ExecutionGraph retrieval failed.
*/
public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
} else {
return cached;
return Optional.of(cached);
}
}

try {
if (jobManager != null) {
Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
Object result = Await.result(future, timeout);

if (result instanceof JobManagerMessages.JobNotFound) {
return null;
}
else if (result instanceof JobManagerMessages.JobFound) {
AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
cache.put(jid, eg);
return eg;
}
else {
throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
}
}
else {
LOG.warn("No connection to the leading JobManager.");
return null;
}
}
catch (Exception e) {
throw new RuntimeException("Error requesting execution graph", e);
}
CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);

Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);

return result.map((executionGraph) -> {
cache.put(jid, executionGraph);

return executionGraph;
});
}
}

This file was deleted.

Loading

0 comments on commit 9f790d3

Please sign in to comment.