Skip to content

Commit

Permalink
Merge pull request #11084: [BEAM-9474] Improve robustness of BundleFa…
Browse files Browse the repository at this point in the history
…ctory and ProcessEnvironment
  • Loading branch information
tweise committed Mar 11, 2020
2 parents 0e88219 + cf219ba commit d62521f
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +105,10 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
private final LinkedBlockingDeque<LoadingCache<Environment, WrappedSdkHarnessClient>>
availableCaches;
private final boolean loadBalanceBundles;
/** Clients which were evicted due to environment expiration but still had pending references. */
private final Set<WrappedSdkHarnessClient> evictedActiveClients;

private boolean closed;

public static DefaultJobBundleFactory create(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
Expand Down Expand Up @@ -140,6 +146,7 @@ public static DefaultJobBundleFactory create(
getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

@VisibleForTesting
Expand All @@ -158,23 +165,28 @@ public static DefaultJobBundleFactory create(
createEnvironmentCaches(serverFactory -> serverInfo, getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

private ImmutableList<LoadingCache<Environment, WrappedSdkHarnessClient>> createEnvironmentCaches(
ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator, int count) {
CacheBuilder builder =
CacheBuilder<Environment, WrappedSdkHarnessClient> builder =
CacheBuilder.newBuilder()
.removalListener(
(RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
int refCount = notification.getValue().unref();
LOG.debug(
"Removed environment {} with {} remaining bundle references.",
notification.getKey(),
refCount);
WrappedSdkHarnessClient client = notification.getValue();
int refCount = client.unref();
if (refCount > 0) {
LOG.warn(
"Expiring environment {} with {} remaining bundle references. Taking note to clean it up during shutdown if the references are not removed by then.",
notification.getKey(),
refCount);
evictedActiveClients.add(client);
}
});

if (environmentExpirationMillis > 0) {
builder = builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
}

ImmutableList.Builder<LoadingCache<Environment, WrappedSdkHarnessClient>> caches =
Expand Down Expand Up @@ -252,14 +264,57 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
}

@Override
public void close() throws Exception {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
public synchronized void close() throws Exception {
if (closed) {
return;
}
// The following code is written defensively to guard against any exceptions occurring
// during shutdown. It is not visually appealing but unless it can be written more
// defensively, there is no reason to change it.
Exception exception = null;
for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
environmentCache.invalidateAll();
environmentCache.cleanUp();
try {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
environmentCache.invalidateAll();
environmentCache.cleanUp();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
// Cleanup any left-over environments which were not properly dereferenced by the Runner, e.g.
// when the bundle was not closed properly. This ensures we do not leak resources.
for (WrappedSdkHarnessClient client : evictedActiveClients) {
try {
//noinspection StatementWithEmptyBody
while (client.unref() > 0) {
// Remove any pending references from the client to force closing the environment
}
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
try {
executor.shutdown();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
closed = true;
if (exception != null) {
throw exception;
}
executor.shutdown();
}

private static ImmutableMap.Builder<String, RemoteOutputReceiver<?>> getOutputReceivers(
Expand Down Expand Up @@ -375,6 +430,11 @@ public RemoteBundle getBundle(
}
}

if (environmentExpirationMillis > 0) {
// Cleanup list of clients which were active during eviction but now do not hold references
evictedActiveClients.removeIf(c -> c.bundleRefCount.get() == 0);
}

final RemoteBundle bundle =
currentClient.processor.newBundle(
getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory)
Expand Down Expand Up @@ -436,6 +496,8 @@ protected static class WrappedSdkHarnessClient {
private final ServerInfo serverInfo;
private final AtomicInteger bundleRefCount = new AtomicInteger();

private boolean closed;

static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
SdkHarnessClient client =
SdkHarnessClient.usingFnApiClient(
Expand All @@ -459,7 +521,10 @@ ServerInfo getServerInfo() {
return serverInfo;
}

public void close() {
public synchronized void close() {
if (closed) {
return;
}
// DO NOT ADD ANYTHING HERE WHICH MIGHT CAUSE THE BLOCK BELOW TO NOT BE EXECUTED.
// If we exit prematurely (e.g. due to an exception), resources won't be cleaned up properly.
// Please make an AutoCloseable and add it to the try statement below.
Expand All @@ -476,6 +541,7 @@ public void close() {
// This will close _all_ of these even in the presence of exceptions.
// The first exception encountered will be the base exception,
// the next one will be added via Throwable#addSuppressed.
closed = true;
} catch (Exception e) {
LOG.warn("Error cleaning up servers {}", environment.getEnvironment(), e);
}
Expand All @@ -487,13 +553,14 @@ private int ref() {
}

private int unref() {
int count = bundleRefCount.decrementAndGet();
if (count == 0) {
int refCount = bundleRefCount.decrementAndGet();
Preconditions.checkState(refCount >= 0, "Reference count must not be negative.");
if (refCount == 0) {
// Close environment after it was removed from cache and all bundles finished.
LOG.info("Closing environment {}", environment.getEnvironment());
close();
}
return count;
return refCount;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class ProcessEnvironment implements RemoteEnvironment {
private final RunnerApi.Environment environment;
private final String workerId;
private final InstructionRequestHandler instructionHandler;
private final Object lock = new Object();

private boolean isClosed;

Expand Down Expand Up @@ -64,13 +63,28 @@ public InstructionRequestHandler getInstructionRequestHandler() {
}

@Override
public void close() throws Exception {
synchronized (lock) {
if (!isClosed) {
instructionHandler.close();
processManager.stopProcess(workerId);
isClosed = true;
public synchronized void close() throws Exception {
if (isClosed) {
return;
}
Exception exception = null;
try {
processManager.stopProcess(workerId);
} catch (Exception e) {
exception = e;
}
try {
instructionHandler.close();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
isClosed = true;
if (exception != null) {
throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
LOG.debug("Attempting to stop process with id {}", id);
// first try to kill gracefully
process.destroy();
long maxTimeToWait = 2000;
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} shut down gracefully.", id);
} else {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
long maxTimeToWait = 500;
try {
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} killed.", id);
} else {
LOG.warn("Process for worker {} could not be killed.", id);
LOG.debug("Process for worker {} shut down gracefully.", id);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (process.isAlive()) {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
}
}
}
}

/** Returns true if the process exists within maxWaitTimeMillis. */
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis)
throws InterruptedException {
final long startTime = System.currentTimeMillis();
while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting on process", e);
}
Thread.sleep(50);
}
return !process.isAlive();
}
Expand All @@ -228,19 +225,18 @@ private ShutdownHook() {}
public void run() {
synchronized (ALL_PROCESS_MANAGERS) {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
try {
// Graceful shutdown period
Thread.sleep(200);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// If any processes are still alive, wait for 200 ms.
try {
if (ALL_PROCESS_MANAGERS.stream()
.anyMatch(pm -> pm.processes.values().stream().anyMatch(Process::isAlive))) {
// Graceful shutdown period after asking processes to quit
Thread.sleep(200);
}
} catch (InterruptedException ignored) {
// Ignore interruptions here to proceed with killing processes
} finally {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
}
}
Expand Down

0 comments on commit d62521f

Please sign in to comment.