diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index 9ce2aab08fe93..7cb82a8b583f3 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.IdentityHashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -76,6 +77,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +105,10 @@ public class DefaultJobBundleFactory implements JobBundleFactory { private final LinkedBlockingDeque> availableCaches; private final boolean loadBalanceBundles; + /** Clients which were evicted due to environment expiration but still had pending references. */ + private final Set evictedActiveClients; + + private boolean closed; public static DefaultJobBundleFactory create(JobInfo jobInfo) { PipelineOptions pipelineOptions = @@ -140,6 +146,7 @@ public static DefaultJobBundleFactory create( getMaxEnvironmentClients(jobInfo)); this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true); this.availableCaches = new LinkedBlockingDeque<>(environmentCaches); + this.evictedActiveClients = Sets.newIdentityHashSet(); } @VisibleForTesting @@ -158,23 +165,28 @@ public static DefaultJobBundleFactory create( createEnvironmentCaches(serverFactory -> serverInfo, getMaxEnvironmentClients(jobInfo)); this.availableCachesSemaphore = new Semaphore(environmentCaches.size(), true); this.availableCaches = new LinkedBlockingDeque<>(environmentCaches); + this.evictedActiveClients = Sets.newIdentityHashSet(); } private ImmutableList> createEnvironmentCaches( ThrowingFunction serverInfoCreator, int count) { - CacheBuilder builder = + CacheBuilder builder = CacheBuilder.newBuilder() .removalListener( (RemovalNotification notification) -> { - int refCount = notification.getValue().unref(); - LOG.debug( - "Removed environment {} with {} remaining bundle references.", - notification.getKey(), - refCount); + WrappedSdkHarnessClient client = notification.getValue(); + int refCount = client.unref(); + if (refCount > 0) { + LOG.warn( + "Expiring environment {} with {} remaining bundle references. Taking note to clean it up during shutdown if the references are not removed by then.", + notification.getKey(), + refCount); + evictedActiveClients.add(client); + } }); if (environmentExpirationMillis > 0) { - builder = builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS); + builder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS); } ImmutableList.Builder> caches = @@ -252,14 +264,57 @@ public StageBundleFactory forStage(ExecutableStage executableStage) { } @Override - public void close() throws Exception { - // Clear the cache. This closes all active environments. - // note this may cause open calls to be cancelled by the peer + public synchronized void close() throws Exception { + if (closed) { + return; + } + // The following code is written defensively to guard against any exceptions occurring + // during shutdown. It is not visually appealing but unless it can be written more + // defensively, there is no reason to change it. + Exception exception = null; for (LoadingCache environmentCache : environmentCaches) { - environmentCache.invalidateAll(); - environmentCache.cleanUp(); + try { + // Clear the cache. This closes all active environments. + // note this may cause open calls to be cancelled by the peer + environmentCache.invalidateAll(); + environmentCache.cleanUp(); + } catch (Exception e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = e; + } + } + } + // Cleanup any left-over environments which were not properly dereferenced by the Runner, e.g. + // when the bundle was not closed properly. This ensures we do not leak resources. + for (WrappedSdkHarnessClient client : evictedActiveClients) { + try { + //noinspection StatementWithEmptyBody + while (client.unref() > 0) { + // Remove any pending references from the client to force closing the environment + } + } catch (Exception e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = e; + } + } + } + try { + executor.shutdown(); + } catch (Exception e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = e; + } + } + closed = true; + if (exception != null) { + throw exception; } - executor.shutdown(); } private static ImmutableMap.Builder> getOutputReceivers( @@ -375,6 +430,11 @@ public RemoteBundle getBundle( } } + if (environmentExpirationMillis > 0) { + // Cleanup list of clients which were active during eviction but now do not hold references + evictedActiveClients.removeIf(c -> c.bundleRefCount.get() == 0); + } + final RemoteBundle bundle = currentClient.processor.newBundle( getOutputReceivers(currentClient.processBundleDescriptor, outputReceiverFactory) @@ -436,6 +496,8 @@ protected static class WrappedSdkHarnessClient { private final ServerInfo serverInfo; private final AtomicInteger bundleRefCount = new AtomicInteger(); + private boolean closed; + static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) { SdkHarnessClient client = SdkHarnessClient.usingFnApiClient( @@ -459,7 +521,10 @@ ServerInfo getServerInfo() { return serverInfo; } - public void close() { + public synchronized void close() { + if (closed) { + return; + } // DO NOT ADD ANYTHING HERE WHICH MIGHT CAUSE THE BLOCK BELOW TO NOT BE EXECUTED. // If we exit prematurely (e.g. due to an exception), resources won't be cleaned up properly. // Please make an AutoCloseable and add it to the try statement below. @@ -476,6 +541,7 @@ public void close() { // This will close _all_ of these even in the presence of exceptions. // The first exception encountered will be the base exception, // the next one will be added via Throwable#addSuppressed. + closed = true; } catch (Exception e) { LOG.warn("Error cleaning up servers {}", environment.getEnvironment(), e); } @@ -487,13 +553,14 @@ private int ref() { } private int unref() { - int count = bundleRefCount.decrementAndGet(); - if (count == 0) { + int refCount = bundleRefCount.decrementAndGet(); + Preconditions.checkState(refCount >= 0, "Reference count must not be negative."); + if (refCount == 0) { // Close environment after it was removed from cache and all bundles finished. LOG.info("Closing environment {}", environment.getEnvironment()); close(); } - return count; + return refCount; } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java index 36beae0180e94..1580b0ae5c3a9 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java @@ -29,7 +29,6 @@ public class ProcessEnvironment implements RemoteEnvironment { private final RunnerApi.Environment environment; private final String workerId; private final InstructionRequestHandler instructionHandler; - private final Object lock = new Object(); private boolean isClosed; @@ -64,13 +63,28 @@ public InstructionRequestHandler getInstructionRequestHandler() { } @Override - public void close() throws Exception { - synchronized (lock) { - if (!isClosed) { - instructionHandler.close(); - processManager.stopProcess(workerId); - isClosed = true; + public synchronized void close() throws Exception { + if (isClosed) { + return; + } + Exception exception = null; + try { + processManager.stopProcess(workerId); + } catch (Exception e) { + exception = e; + } + try { + instructionHandler.close(); + } catch (Exception e) { + if (exception != null) { + exception.addSuppressed(e); + } else { + exception = e; } } + isClosed = true; + if (exception != null) { + throw exception; + } } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java index f1d88d03309af..0d31f3b82e356 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java @@ -186,31 +186,28 @@ private void stopProcess(String id, Process process) { LOG.debug("Attempting to stop process with id {}", id); // first try to kill gracefully process.destroy(); - long maxTimeToWait = 2000; - if (waitForProcessToDie(process, maxTimeToWait)) { - LOG.debug("Process for worker {} shut down gracefully.", id); - } else { - LOG.info("Process for worker {} still running. Killing.", id); - process.destroyForcibly(); + long maxTimeToWait = 500; + try { if (waitForProcessToDie(process, maxTimeToWait)) { - LOG.debug("Process for worker {} killed.", id); - } else { - LOG.warn("Process for worker {} could not be killed.", id); + LOG.debug("Process for worker {} shut down gracefully.", id); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if (process.isAlive()) { + LOG.info("Process for worker {} still running. Killing.", id); + process.destroyForcibly(); } } } } /** Returns true if the process exists within maxWaitTimeMillis. */ - private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) { + private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) + throws InterruptedException { final long startTime = System.currentTimeMillis(); while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting on process", e); - } + Thread.sleep(50); } return !process.isAlive(); } @@ -228,19 +225,18 @@ private ShutdownHook() {} public void run() { synchronized (ALL_PROCESS_MANAGERS) { ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses); - for (ProcessManager pm : ALL_PROCESS_MANAGERS) { - if (pm.processes.values().stream().anyMatch(Process::isAlive)) { - try { - // Graceful shutdown period - Thread.sleep(200); - break; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + // If any processes are still alive, wait for 200 ms. + try { + if (ALL_PROCESS_MANAGERS.stream() + .anyMatch(pm -> pm.processes.values().stream().anyMatch(Process::isAlive))) { + // Graceful shutdown period after asking processes to quit + Thread.sleep(200); } + } catch (InterruptedException ignored) { + // Ignore interruptions here to proceed with killing processes + } finally { + ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses); } - ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses); } } }