Skip to content

Commit

Permalink
[BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment
Browse files Browse the repository at this point in the history
The cleanup code in DefaultJobBundleFactory and its RemoteEnvironments may leak
resources. This is especially a concern when the execution engines reuses the
same JVM or underlying machines for multiple runs of a pipeline.

Exceptions encountered during cleanup should not lead to aborting the cleanup
procedure. Not all code handles this correctly. We should also ensure that the
cleanup succeeds even if the runner does not properly close the bundle,
e.g. when a exception occurs during closing the bundle.
  • Loading branch information
mxm committed Mar 11, 2020
1 parent dc65ecf commit cf219ba
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +105,10 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
private final LinkedBlockingDeque<LoadingCache<Environment, WrappedSdkHarnessClient>>
availableCaches;
private final boolean loadBalanceBundles;
/** Clients which were evicted due to environment expiration but still had pending references. */
private final Set<WrappedSdkHarnessClient> evictedActiveClients;

private boolean closed;

public static DefaultJobBundleFactory create(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
Expand Down Expand Up @@ -140,6 +146,7 @@ public static DefaultJobBundleFactory create(
getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

@VisibleForTesting
Expand All @@ -158,23 +165,28 @@ public static DefaultJobBundleFactory create(
createEnvironmentCaches(serverFactory -> serverInfo, getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

private ImmutableList<LoadingCache<Environment, WrappedSdkHarnessClient>> createEnvironmentCaches(
ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator, int count) {
CacheBuilder builder =
CacheBuilder<Environment, WrappedSdkHarnessClient> builder =
CacheBuilder.newBuilder()
.removalListener(
(RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
int refCount = notification.getValue().unref();
LOG.debug(
"Removed environment {} with {} remaining bundle references.",
notification.getKey(),
refCount);
WrappedSdkHarnessClient client = notification.getValue();
int refCount = client.unref();
if (refCount > 0) {
LOG.warn(
"Expiring environment {} with {} remaining bundle references. Taking note to clean it up during shutdown if the references are not removed by then.",
notification.getKey(),
refCount);
evictedActiveClients.add(client);
}
});

if (environmentExpirationMillis > 0) {
builder = builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
}

ImmutableList.Builder<LoadingCache<Environment, WrappedSdkHarnessClient>> caches =
Expand Down Expand Up @@ -252,14 +264,57 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
}

@Override
public void close() throws Exception {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
public synchronized void close() throws Exception {
if (closed) {
return;
}
// The following code is written defensively to guard against any exceptions occurring
// during shutdown. It is not visually appealing but unless it can be written more
// defensively, there is no reason to change it.
Exception exception = null;
for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
environmentCache.invalidateAll();
environmentCache.cleanUp();
try {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
environmentCache.invalidateAll();
environmentCache.cleanUp();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
// Cleanup any left-over environments which were not properly dereferenced by the Runner, e.g.
// when the bundle was not closed properly. This ensures we do not leak resources.
for (WrappedSdkHarnessClient client : evictedActiveClients) {
try {
//noinspection StatementWithEmptyBody
while (client.unref() > 0) {
// Remove any pending references from the client to force closing the environment
}
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
try {
executor.shutdown();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
closed = true;
if (exception != null) {
throw exception;
}
executor.shutdown();
}

private static ImmutableMap.Builder<String, RemoteOutputReceiver<?>> getOutputReceivers(
Expand Down Expand Up @@ -375,6 +430,11 @@ public RemoteBundle getBundle(
}
}

if (environmentExpirationMillis > 0) {
// Cleanup list of clients which were active during eviction but now do not hold references
evictedActiveClients.removeIf(c -> c.bundleRefCount.get() == 0);
}

final RemoteBundle bundle =
currentClient.processor.newBundle(
getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory)
Expand Down Expand Up @@ -436,6 +496,8 @@ protected static class WrappedSdkHarnessClient {
private final ServerInfo serverInfo;
private final AtomicInteger bundleRefCount = new AtomicInteger();

private boolean closed;

static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
SdkHarnessClient client =
SdkHarnessClient.usingFnApiClient(
Expand All @@ -459,7 +521,10 @@ ServerInfo getServerInfo() {
return serverInfo;
}

public void close() {
public synchronized void close() {
if (closed) {
return;
}
// DO NOT ADD ANYTHING HERE WHICH MIGHT CAUSE THE BLOCK BELOW TO NOT BE EXECUTED.
// If we exit prematurely (e.g. due to an exception), resources won't be cleaned up properly.
// Please make an AutoCloseable and add it to the try statement below.
Expand All @@ -476,6 +541,7 @@ public void close() {
// This will close _all_ of these even in the presence of exceptions.
// The first exception encountered will be the base exception,
// the next one will be added via Throwable#addSuppressed.
closed = true;
} catch (Exception e) {
LOG.warn("Error cleaning up servers {}", environment.getEnvironment(), e);
}
Expand All @@ -487,13 +553,14 @@ private int ref() {
}

private int unref() {
int count = bundleRefCount.decrementAndGet();
if (count == 0) {
int refCount = bundleRefCount.decrementAndGet();
Preconditions.checkState(refCount >= 0, "Reference count must not be negative.");
if (refCount == 0) {
// Close environment after it was removed from cache and all bundles finished.
LOG.info("Closing environment {}", environment.getEnvironment());
close();
}
return count;
return refCount;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class ProcessEnvironment implements RemoteEnvironment {
private final RunnerApi.Environment environment;
private final String workerId;
private final InstructionRequestHandler instructionHandler;
private final Object lock = new Object();

private boolean isClosed;

Expand Down Expand Up @@ -64,13 +63,28 @@ public InstructionRequestHandler getInstructionRequestHandler() {
}

@Override
public void close() throws Exception {
synchronized (lock) {
if (!isClosed) {
instructionHandler.close();
processManager.stopProcess(workerId);
isClosed = true;
public synchronized void close() throws Exception {
if (isClosed) {
return;
}
Exception exception = null;
try {
processManager.stopProcess(workerId);
} catch (Exception e) {
exception = e;
}
try {
instructionHandler.close();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
isClosed = true;
if (exception != null) {
throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
LOG.debug("Attempting to stop process with id {}", id);
// first try to kill gracefully
process.destroy();
long maxTimeToWait = 2000;
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} shut down gracefully.", id);
} else {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
long maxTimeToWait = 500;
try {
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} killed.", id);
} else {
LOG.warn("Process for worker {} could not be killed.", id);
LOG.debug("Process for worker {} shut down gracefully.", id);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (process.isAlive()) {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
}
}
}
}

/** Returns true if the process exists within maxWaitTimeMillis. */
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis)
throws InterruptedException {
final long startTime = System.currentTimeMillis();
while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting on process", e);
}
Thread.sleep(50);
}
return !process.isAlive();
}
Expand All @@ -228,19 +225,18 @@ private ShutdownHook() {}
public void run() {
synchronized (ALL_PROCESS_MANAGERS) {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
try {
// Graceful shutdown period
Thread.sleep(200);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// If any processes are still alive, wait for 200 ms.
try {
if (ALL_PROCESS_MANAGERS.stream()
.anyMatch(pm -> pm.processes.values().stream().anyMatch(Process::isAlive))) {
// Graceful shutdown period after asking processes to quit
Thread.sleep(200);
}
} catch (InterruptedException ignored) {
// Ignore interruptions here to proceed with killing processes
} finally {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
}
}
Expand Down

0 comments on commit cf219ba

Please sign in to comment.