Skip to content

Commit

Permalink
[FLINK-2844] [web frontend] Remove old web interface
Browse files Browse the repository at this point in the history
- make new web one the default
- adapt tests
- make web directory a resource to be included in the fat jar
- serve static files of web interface dynamic through the class loader
- run on YARN
- remove Jetty dependencies from poms
  • Loading branch information
mxm authored and StephanEwen committed Oct 17, 2015
1 parent a8eeb3b commit df44862
Show file tree
Hide file tree
Showing 80 changed files with 276 additions and 31,642 deletions.
8 changes: 4 additions & 4 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ under the License.
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<!-- version is derived from base module -->
<version>8.0.0.M1</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<!-- version is derived from base module -->
<version>8.0.0.M1</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<!-- version is derived from base module -->
<version>8.0.0.M1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,16 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";

/**
* The option that specifies whether to use the new web frontend
*/
public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";

/**
* The config parameter defining the number of archived jobs for the jobmanager
*/
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";

public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";

/** The directory where the web server's static contents is stored */
public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = "jobmanager.web.docroot";



// ------------------------------ Web Client ------------------------------

/**
* The config parameter defining port for the pact web-frontend server.
*/
Expand Down
7 changes: 0 additions & 7 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,6 @@ under the License.
<fileMode>0644</fileMode>
</fileSet>

<!-- copy the web documents -->
<fileSet>
<directory>../flink-runtime-web/web-dashboard/web</directory>
<outputDirectory>resources/web-runtime-monitor</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>

<!-- copy the tools -->
<fileSet>
<directory>src/main/flink-bin/tools</directory>
Expand Down
12 changes: 12 additions & 0 deletions flink-runtime-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ under the License.

<packaging>jar</packaging>

<build>
<resources>
<resource>
<!-- Only include the web folder from the web-dashboard directory -->
<directory>web-dashboard</directory>
<includes>
<include>web/**</include>
</includes>
</resource>
</resources>
</build>

<dependencies>

<!-- ===================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ public class WebMonitorConfig {
/** The port for the runtime monitor web-frontend server. */
public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;

/** The directory where the web server's static contents is stored */
public static final String JOB_MANAGER_WEB_DOC_ROOT_KEY = ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY;

/** The initial refresh interval for the web dashboard */
public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";


// ------------------------------------------------------------------------
// Default values
// ------------------------------------------------------------------------
Expand All @@ -47,32 +44,28 @@ public class WebMonitorConfig {

/** Default refresh interval for the web dashboard (= 3000 msecs) */
public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;


// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------

/** The configuration queried by this config object */
private final Configuration config;


public WebMonitorConfig(Configuration config) {
if (config == null) {
throw new NullPointerException();
}
this.config = config;
}


public int getWebFrontendPort() {
return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
}

public String getWebRoot() {
return config.getString(JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
}


public long getRefreshInterval() {
return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@
import io.netty.handler.codec.http.router.Router;

import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
Expand Down Expand Up @@ -66,7 +65,9 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -82,64 +83,47 @@ public class WebRuntimeMonitor implements WebMonitor {

/** By default, all requests to the JobManager have a timeout of 10 seconds */
public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);

/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);

/** Teh default path under which the static contents is stored */
private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";


// ------------------------------------------------------------------------

/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
private final Object startupShutdownLock = new Object();

private final LeaderRetrievalService leaderRetrievalService;

/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
private final JobManagerArchiveRetriever retriever;

private final Router router;

private final int configuredPort;

private ServerBootstrap bootstrap;

private Channel serverChannel;


private final File webRootDir;

private AtomicBoolean isShutdown = new AtomicBoolean();


public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
ActorSystem actorSystem) throws IOException
{
this.leaderRetrievalService = checkNotNull(leaderRetrievalService);

final WebMonitorConfig cfg = new WebMonitorConfig(config);

// figure out where our static contents is
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
final String configuredWebRoot = cfg.getWebRoot();

final File webRootDir;
if (configuredWebRoot != null) {
webRootDir = new File(configuredWebRoot);
}
else if (flinkRoot != null) {
webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
}
else {
throw new IllegalConfigurationException("The given configuration provides neither the web-document root ("
+ WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root ("
+ ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
}

// validate that the doc root is a valid directory
if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead())) {
throw new IllegalConfigurationException("The path to the static contents (" +
webRootDir.getAbsolutePath() + ") is not a readable directory.");
}


// create an empty directory in temp for the web server
String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
LOG.info("Using directory {} for the web interface files", webRootDir);

// port configuration
this.configuredPort = cfg.getWebFrontendPort();
if (this.configuredPort < 0) {
Expand All @@ -150,7 +134,7 @@ else if (flinkRoot != null) {
FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);

retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);

ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);

router = new Router()
Expand Down Expand Up @@ -200,48 +184,64 @@ public void start() throws Exception {
if (this.bootstrap != null) {
throw new IllegalStateException("The server has already been started");
}


// add shutdown hook for deleting the directory
try {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutdown();
}
});
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
} catch(Throwable t) {
// these errors usually happen when the shutdown is already in progress
LOG.warn("Error while adding shutdown hook", t);
}

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
Handler handler = new Handler(router);

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new ChunkedWriteHandler())
.addLast(handler.name(), handler);
}
};

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);

Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
this.serverChannel = ch;

InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();

LOG.info("Web frontend listening at " + address + ':' + port);

leaderRetrievalService.start(retriever);
}
}

@Override
public void stop() throws Exception {
synchronized (startupShutdownLock) {
leaderRetrievalService.stop();

if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
this.serverChannel = null;
Expand All @@ -252,9 +252,11 @@ public void stop() throws Exception {
}
this.bootstrap = null;
}

shutdown();
}
}

@Override
public int getServerPort() {
Channel server = this.serverChannel;
Expand All @@ -266,14 +268,26 @@ public int getServerPort() {
LOG.error("Cannot access local server port", e);
}
}

return -1;
}


private void shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
return;
}
try {
LOG.info("Removing web root dir {}", webRootDir);
FileUtils.deleteDirectory(webRootDir);
} catch (Throwable t) {
LOG.warn("Error while deleting web root dir {}", webRootDir, t);
}
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

private static RuntimeMonitorHandler handler(RequestHandler handler) {
return new RuntimeMonitorHandler(handler);
}
Expand Down
Loading

0 comments on commit df44862

Please sign in to comment.