Skip to content

Commit

Permalink
[FLINK-14378][state-backends] Cleanup rocksDB lib folder if loading t…
Browse files Browse the repository at this point in the history
…he library fails

This closes apache#10423
  • Loading branch information
Myasuka authored and StephanEwen committed Dec 9, 2019
1 parent 47f2939 commit 0d9058c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
Expand Down Expand Up @@ -922,7 +923,8 @@ public String toString() {
// static library loading utilities
// ------------------------------------------------------------------------

private static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
@VisibleForTesting
static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException {
synchronized (RocksDBStateBackend.class) {
if (!rocksDbInitialized) {

Expand All @@ -931,6 +933,7 @@ private static void ensureRocksDBIsLoaded(String tempDirectory) throws IOExcepti

Throwable lastException = null;
for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
File rocksLibFolder = null;
try {
// when multiple instances of this class and RocksDB exist in different
// class loaders, then we can see the following exception:
Expand All @@ -942,7 +945,7 @@ private static void ensureRocksDBIsLoaded(String tempDirectory) throws IOExcepti
// instances of the same JNI library being loaded in different class loaders, but
// apparently not when coming from the same file path, so there we go)

final File rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());
rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());

// make sure the temp path exists
LOG.debug("Attempting to create RocksDB native library folder {}", rocksLibFolder);
Expand Down Expand Up @@ -970,6 +973,8 @@ private static void ensureRocksDBIsLoaded(String tempDirectory) throws IOExcepti
} catch (Throwable tt) {
LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt);
}

FileUtils.deleteDirectoryQuietly(rocksLibFolder);
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.runtime.operators.testutils.ExpectedTestException;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.rocksdb.RocksDB;

import java.io.File;
import java.io.IOException;

import static org.junit.Assert.fail;

/**
* Tests for {@link RocksDBStateBackend} on initialization.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({RocksDB.class})
public class RocksDBInitTest {

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* This test checks that the RocksDB native code loader still responds to resetting the init flag.
*/
@Test
public void testResetInitFlag() throws Exception {
RocksDBStateBackend.resetRocksDBLoadedFlag();
}

@Test
public void testTempLibFolderDeletedOnFail() throws Exception {
PowerMockito.spy(RocksDB.class);
PowerMockito.when(RocksDB.class, "loadLibrary").thenThrow(new ExpectedTestException());

File tempFolder = temporaryFolder.newFolder();
try {
RocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath());
fail("Not throwing expected exception.");
} catch (IOException ignored) {
// ignored
}
File[] files = tempFolder.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(0, files.length);
}
}

0 comments on commit 0d9058c

Please sign in to comment.