Skip to content

Commit

Permalink
[FLINK-17645][runtime] Reset SafetyNetCloseableRegistry#REAPER_THREAD…
Browse files Browse the repository at this point in the history
… if it fails to start
  • Loading branch information
lijiewang.wlj authored and zhuzhurk committed May 21, 2020
1 parent f52899a commit 9f3a711
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.lang.ref.ReferenceQueue;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Supplier;

/**
* This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When
Expand Down Expand Up @@ -68,13 +69,23 @@ public class SafetyNetCloseableRegistry extends
//CHECKSTYLE.ON: StaticVariableName

SafetyNetCloseableRegistry() {
this(() -> new CloseableReaperThread());
}

@VisibleForTesting
SafetyNetCloseableRegistry(Supplier<CloseableReaperThread> reaperThreadSupplier) {
super(new IdentityHashMap<>());

synchronized (REAPER_THREAD_LOCK) {
if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
Preconditions.checkState(null == REAPER_THREAD);
REAPER_THREAD = new CloseableReaperThread();
REAPER_THREAD.start();
try {
REAPER_THREAD = reaperThreadSupplier.get();
REAPER_THREAD.start();
} catch (Throwable throwable) {
REAPER_THREAD = null;
throw throwable;
}
}
++GLOBAL_SAFETY_NET_REGISTRY_COUNT;
}
Expand Down Expand Up @@ -172,13 +183,13 @@ public void close() throws IOException {
/**
* Reaper runnable collects and closes leaking resources.
*/
static final class CloseableReaperThread extends Thread {
static class CloseableReaperThread extends Thread {

private final ReferenceQueue<WrappingProxyCloseable<? extends Closeable>> referenceQueue;

private volatile boolean running;

private CloseableReaperThread() {
protected CloseableReaperThread() {
super("CloseableReaperThread");
this.setDaemon(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,32 @@ public void testReaperThreadSpawnAndStop() throws Exception {
}
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
}

/**
* Test whether failure to start thread in {@link SafetyNetCloseableRegistry}
* constructor can lead to failure of subsequent state check.
*/
@Test
public void testReaperThreadStartFailed() throws Exception {

try {
new SafetyNetCloseableRegistry(() -> new OutOfMemoryReaperThread());
} catch (java.lang.OutOfMemoryError error) {
}
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());

// the OOM error will not lead to failure of subsequent constructor call.
SafetyNetCloseableRegistry closeableRegistry = new SafetyNetCloseableRegistry();
Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());

closeableRegistry.close();
}

private static class OutOfMemoryReaperThread extends SafetyNetCloseableRegistry.CloseableReaperThread {

@Override
public synchronized void start() {
throw new java.lang.OutOfMemoryError();
}
}
}

0 comments on commit 9f3a711

Please sign in to comment.