Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment #11084

Merged
merged 1 commit into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,6 +105,10 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
private final LinkedBlockingDeque<LoadingCache<Environment, WrappedSdkHarnessClient>>
availableCaches;
private final boolean loadBalanceBundles;
/** Clients which were evicted due to environment expiration but still had pending references. */
private final Set<WrappedSdkHarnessClient> evictedActiveClients;

private boolean closed;

public static DefaultJobBundleFactory create(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
Expand Down Expand Up @@ -140,6 +146,7 @@ public static DefaultJobBundleFactory create(
getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

@VisibleForTesting
Expand All @@ -158,23 +165,28 @@ public static DefaultJobBundleFactory create(
createEnvironmentCaches(serverFactory -> serverInfo, getMaxEnvironmentClients(jobInfo));
this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true);
this.availableCaches = new LinkedBlockingDeque<>(environmentCaches);
this.evictedActiveClients = Sets.newIdentityHashSet();
}

private ImmutableList<LoadingCache<Environment, WrappedSdkHarnessClient>> createEnvironmentCaches(
ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator, int count) {
CacheBuilder builder =
CacheBuilder<Environment, WrappedSdkHarnessClient> builder =
CacheBuilder.newBuilder()
.removalListener(
(RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
int refCount = notification.getValue().unref();
LOG.debug(
"Removed environment {} with {} remaining bundle references.",
notification.getKey(),
refCount);
WrappedSdkHarnessClient client = notification.getValue();
mxm marked this conversation as resolved.
Show resolved Hide resolved
int refCount = client.unref();
if (refCount > 0) {
LOG.warn(
"Expiring environment {} with {} remaining bundle references. Taking note to clean it up during shutdown if the references are not removed by then.",
notification.getKey(),
refCount);
evictedActiveClients.add(client);
}
});

if (environmentExpirationMillis > 0) {
builder = builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
}

ImmutableList.Builder<LoadingCache<Environment, WrappedSdkHarnessClient>> caches =
Expand Down Expand Up @@ -252,14 +264,57 @@ public StageBundleFactory forStage(ExecutableStage executableStage) {
}

@Override
public void close() throws Exception {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
public synchronized void close() throws Exception {
if (closed) {
return;
}
// The following code is written defensively to guard against any exceptions occurring
// during shutdown. It is not visually appealing but unless it can be written more
// defensively, there is no reason to change it.
Exception exception = null;
mxm marked this conversation as resolved.
Show resolved Hide resolved
for (LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache : environmentCaches) {
environmentCache.invalidateAll();
environmentCache.cleanUp();
try {
// Clear the cache. This closes all active environments.
// note this may cause open calls to be cancelled by the peer
environmentCache.invalidateAll();
environmentCache.cleanUp();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
// Cleanup any left-over environments which were not properly dereferenced by the Runner, e.g.
// when the bundle was not closed properly. This ensures we do not leak resources.
for (WrappedSdkHarnessClient client : evictedActiveClients) {
try {
//noinspection StatementWithEmptyBody
while (client.unref() > 0) {
// Remove any pending references from the client to force closing the environment
}
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
}
try {
executor.shutdown();
} catch (Exception e) {
mxm marked this conversation as resolved.
Show resolved Hide resolved
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
closed = true;
if (exception != null) {
throw exception;
}
executor.shutdown();
}

private static ImmutableMap.Builder<String, RemoteOutputReceiver<?>> getOutputReceivers(
Expand Down Expand Up @@ -375,6 +430,11 @@ public RemoteBundle getBundle(
}
}

if (environmentExpirationMillis > 0) {
// Cleanup list of clients which were active during eviction but now do not hold references
evictedActiveClients.removeIf(c -> c.bundleRefCount.get() == 0);
}

final RemoteBundle bundle =
currentClient.processor.newBundle(
getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory)
Expand Down Expand Up @@ -436,6 +496,8 @@ protected static class WrappedSdkHarnessClient {
private final ServerInfo serverInfo;
private final AtomicInteger bundleRefCount = new AtomicInteger();

private boolean closed;

static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) {
SdkHarnessClient client =
SdkHarnessClient.usingFnApiClient(
Expand All @@ -459,7 +521,10 @@ ServerInfo getServerInfo() {
return serverInfo;
}

public void close() {
public synchronized void close() {
if (closed) {
return;
}
// DO NOT ADD ANYTHING HERE WHICH MIGHT CAUSE THE BLOCK BELOW TO NOT BE EXECUTED.
// If we exit prematurely (e.g. due to an exception), resources won't be cleaned up properly.
// Please make an AutoCloseable and add it to the try statement below.
Expand All @@ -476,6 +541,7 @@ public void close() {
// This will close _all_ of these even in the presence of exceptions.
// The first exception encountered will be the base exception,
// the next one will be added via Throwable#addSuppressed.
closed = true;
} catch (Exception e) {
LOG.warn("Error cleaning up servers {}", environment.getEnvironment(), e);
}
Expand All @@ -487,13 +553,14 @@ private int ref() {
}

private int unref() {
int count = bundleRefCount.decrementAndGet();
if (count == 0) {
int refCount = bundleRefCount.decrementAndGet();
Preconditions.checkState(refCount >= 0, "Reference count must not be negative.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I've added this check instead to check for correct bounds.

if (refCount == 0) {
// Close environment after it was removed from cache and all bundles finished.
LOG.info("Closing environment {}", environment.getEnvironment());
close();
}
return count;
return refCount;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class ProcessEnvironment implements RemoteEnvironment {
private final RunnerApi.Environment environment;
private final String workerId;
private final InstructionRequestHandler instructionHandler;
private final Object lock = new Object();

private boolean isClosed;

Expand Down Expand Up @@ -64,13 +63,28 @@ public InstructionRequestHandler getInstructionRequestHandler() {
}

@Override
public void close() throws Exception {
synchronized (lock) {
if (!isClosed) {
instructionHandler.close();
processManager.stopProcess(workerId);
isClosed = true;
public synchronized void close() throws Exception {
if (isClosed) {
return;
}
Exception exception = null;
try {
processManager.stopProcess(workerId);
} catch (Exception e) {
exception = e;
}
try {
instructionHandler.close();
} catch (Exception e) {
if (exception != null) {
exception.addSuppressed(e);
} else {
exception = e;
}
}
isClosed = true;
if (exception != null) {
throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) {
LOG.debug("Attempting to stop process with id {}", id);
// first try to kill gracefully
process.destroy();
long maxTimeToWait = 2000;
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} shut down gracefully.", id);
} else {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
long maxTimeToWait = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the timeout change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old timeout was too long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did that manifest? 500ms timeout for process termination seems too aggressive. I would prefer a longer timeout to allow for connections to be closed gracefully.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shutdown code is synchronous, so I'd prefer a shorter timeout here. It would be a good improvement to make it async.

I've tested this on a cluster with many runs and I've not seen a single instance lingering. Also I did not notice any difference in the logs. The environment will be torn down last, after all connections have been closed. So failures would not be visible anymore.

Copy link
Contributor Author

@mxm mxm Mar 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want I can restore the old timeout but I would then also change the code to make the stopping async or at least stop all the processes at once and then wait (instead of tearing down the process one-by-one and wait for each process to quit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have verified that the graceful shutdown works (in the happy path), then we are good. Maybe add a comment to the code, since all of this isn't very obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always shutting down gracefully but that's what the change is about: removing processes and ensuring a quick recovery time. It's a trade-off. Ideally we would want to allow more time but if we wait 2 seconds with an SDK parallelism of 16, that's already more than half a minute waiting time. We really want to do the process removal in parallel. I'll look into this.

I'm not sure the ProcessManager is a good place to document the shutdown behavior. If you have any suggestions though, I'll add them here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't delay the PR for it!

try {
if (waitForProcessToDie(process, maxTimeToWait)) {
LOG.debug("Process for worker {} killed.", id);
} else {
LOG.warn("Process for worker {} could not be killed.", id);
LOG.debug("Process for worker {} shut down gracefully.", id);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (process.isAlive()) {
LOG.info("Process for worker {} still running. Killing.", id);
process.destroyForcibly();
}
}
}
}

/** Returns true if the process exists within maxWaitTimeMillis. */
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis)
throws InterruptedException {
final long startTime = System.currentTimeMillis();
while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting on process", e);
}
Thread.sleep(50);
}
return !process.isAlive();
}
Expand All @@ -228,19 +225,18 @@ private ShutdownHook() {}
public void run() {
synchronized (ALL_PROCESS_MANAGERS) {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
try {
// Graceful shutdown period
Thread.sleep(200);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// If any processes are still alive, wait for 200 ms.
try {
if (ALL_PROCESS_MANAGERS.stream()
.anyMatch(pm -> pm.processes.values().stream().anyMatch(Process::isAlive))) {
// Graceful shutdown period after asking processes to quit
Thread.sleep(200);
}
} catch (InterruptedException ignored) {
// Ignore interruptions here to proceed with killing processes
} finally {
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
}
}
}
Expand Down