Skip to content

Commit

Permalink
[FLINK-17308] Add regular cleanup task for ExecutionGraphCache
Browse files Browse the repository at this point in the history
The WebMonitorEndpoint now schedules are regular cleanup task which
runs every 2 * WebOptions.REFRESH_INTERVAL and tries to clean up
expired ExecutionGraphCache entries. This ensures that we will remove
unused entries.

This closes apache#11879.
  • Loading branch information
tillrohrmann committed Apr 24, 2020
1 parent 08025ec commit 8514200
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -146,6 +148,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -175,6 +178,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp

private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);

@Nullable
private ScheduledFuture<?> executionGraphCleanupTask;

public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<? extends T> leaderRetriever,
Expand Down Expand Up @@ -725,13 +731,28 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
@Override
public void startInternal() throws Exception {
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();

if (hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}

private void startExecutionGraphCacheCleanupTask() {
final long cleanupInterval = 2 * restConfiguration.getRefreshInterval();
executionGraphCleanupTask = executor.scheduleWithFixedDelay(
executionGraphCache::cleanup,
cleanupInterval,
cleanupInterval,
TimeUnit.MILLISECONDS);
}

@Override
protected CompletableFuture<Void> shutDownInternal() {
if (executionGraphCleanupTask != null) {
executionGraphCleanupTask.cancel(false);
}

executionGraphCache.close();

final CompletableFuture<Void> shutdownFuture = FutureUtils.runAfterwards(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.IntSupplier;

/**
* Testing implementation of {@link ExecutionGraphCache}.
*/
public class TestingExecutionGraphCache implements ExecutionGraphCache {
private final IntSupplier sizeSupplier;

private final BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction;

private final Runnable cleanupRunnable;

private final Runnable closeRunnable;

private TestingExecutionGraphCache(
IntSupplier sizeSupplier,
BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction,
Runnable cleanupRunnable,
Runnable closeRunnable) {
this.sizeSupplier = sizeSupplier;
this.getExecutionGraphFunction = getExecutionGraphFunction;
this.cleanupRunnable = cleanupRunnable;
this.closeRunnable = closeRunnable;
}

@Override
public int size() {
return sizeSupplier.getAsInt();
}

@Override
public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) {
return getExecutionGraphFunction.apply(jobId, restfulGateway);
}

@Override
public void cleanup() {
cleanupRunnable.run();
}

@Override
public void close() {
closeRunnable.run();
}

public static Builder newBuilder() {
return new Builder();
}

/**
* Builder for the {@link TestingExecutionGraphCache}.
*/
public static final class Builder {

private IntSupplier sizeSupplier = () -> 0;
private BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction = (ignoredA, ignoredB) -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
private Runnable cleanupRunnable = () -> {};
private Runnable closeRunnable = () -> {};

private Builder() {}

public Builder setSizeSupplier(IntSupplier sizeSupplier) {
this.sizeSupplier = sizeSupplier;
return this;
}

public Builder setGetExecutionGraphFunction(BiFunction<JobID, RestfulGateway, CompletableFuture<AccessExecutionGraph>> getExecutionGraphFunction) {
this.getExecutionGraphFunction = getExecutionGraphFunction;
return this;
}

public Builder setCleanupRunnable(Runnable cleanupRunnable) {
this.cleanupRunnable = cleanupRunnable;
return this;
}

public Builder setCloseRunnable(Runnable closeRunnable) {
this.closeRunnable = closeRunnable;
return this;
}

public TestingExecutionGraphCache build() {
return new TestingExecutionGraphCache(sizeSupplier, getExecutionGraphFunction, cleanupRunnable, closeRunnable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTransientBlobService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Tests for the {@link WebMonitorEndpoint}.
*/
public class WebMonitorEndpointTest extends TestLogger {

@Test
public void cleansUpExpiredExecutionGraphs() throws Exception {
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.ADDRESS, "localhost");
configuration.setLong(WebOptions.REFRESH_INTERVAL, 5L);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final long timeout = 10000L;

final OneShotLatch cleanupLatch = new OneShotLatch();
final TestingExecutionGraphCache executionGraphCache = TestingExecutionGraphCache.newBuilder()
.setCleanupRunnable(cleanupLatch::trigger)
.build();
try (final WebMonitorEndpoint<RestfulGateway> webMonitorEndpoint = new WebMonitorEndpoint<>(
RestServerEndpointConfiguration.fromConfiguration(configuration),
CompletableFuture::new,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
CompletableFuture::new,
NoOpTransientBlobService.INSTANCE,
executor,
VoidMetricFetcher.INSTANCE,
new TestingLeaderElectionService(),
executionGraphCache,
new TestingFatalErrorHandler())) {

webMonitorEndpoint.start();

// check that the cleanup will be triggered
cleanupLatch.await(timeout, TimeUnit.MILLISECONDS);
} finally {
ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, executor);
}
}
}

0 comments on commit 8514200

Please sign in to comment.