diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index ae4298c8e3872..c74a41cb7a13f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -44,7 +45,11 @@ import javax.annotation.Nullable; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -59,8 +64,11 @@ public final class RocksDBResourceContainer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class); + private static final String ROCKSDB_RELOCATE_LOG_SUFFIX = "_LOG"; + // the filename length limit is 255 on most operating systems - private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - "_LOG".length(); + private static final int INSTANCE_PATH_LENGTH_LIMIT = + 255 - ROCKSDB_RELOCATE_LOG_SUFFIX.length(); @Nullable private final File instanceRocksDBPath; @@ -85,6 +93,8 @@ public final class RocksDBResourceContainer implements AutoCloseable { /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; + @Nullable private Path relocatedDbLogBaseDir; + @VisibleForTesting public RocksDBResourceContainer() { this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null, false); @@ -267,6 +277,7 @@ public void close() throws Exception { if (sharedResources != null) { sharedResources.close(); } + cleanRelocatedDbLogs(); } /** @@ -426,7 +437,9 @@ private void relocateDefaultDbLogDir(DBOptions dbOptions) { if (logFilePath != null) { File logFile = resolveFileLocation(logFilePath); if (logFile != null && resolveFileLocation(logFile.getParent()) != null) { - dbOptions.setDbLogDir(logFile.getParent()); + String relocatedDbLogDir = logFile.getParent(); + this.relocatedDbLogBaseDir = new File(relocatedDbLogDir).toPath(); + dbOptions.setDbLogDir(relocatedDbLogDir); } } } @@ -441,4 +454,44 @@ private File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } + + /** Clean all relocated rocksdb logs. */ + private void cleanRelocatedDbLogs() { + if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) { + LOG.info("Cleaning up relocated RocksDB logs: {}.", relocatedDbLogBaseDir); + + String relocatedDbLogPrefix = + resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath()); + try { + Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) + .filter( + path -> + !Files.isDirectory(path) + && path.toFile() + .getName() + .startsWith(relocatedDbLogPrefix)) + .forEach(IOUtils::deleteFileQuietly); + } catch (IOException e) { + LOG.warn( + "Could not list relocated RocksDB log directory: {}", + relocatedDbLogBaseDir); + } + } + } + + /** + * Resolve the prefix of rocksdb's log file name according to rocksdb's log file name rules. See + * https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30. + * + * @param instanceRocksDBAbsolutePath The path where the rocksdb directory is located. + * @return Resolved rocksdb log name prefix. + */ + private String resolveRelocatedDbLogPrefix(String instanceRocksDBAbsolutePath) { + if (!instanceRocksDBAbsolutePath.isEmpty() + && !instanceRocksDBAbsolutePath.matches("^[a-zA-Z0-9\\-._].*")) { + instanceRocksDBAbsolutePath = instanceRocksDBAbsolutePath.substring(1); + } + return instanceRocksDBAbsolutePath.replaceAll("[^a-zA-Z0-9\\-._]", "_") + + ROCKSDB_RELOCATE_LOG_SUFFIX; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 34d27e1bb5915..85678474e7ba9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FileSystem; @@ -47,13 +48,16 @@ import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; +import org.apache.commons.lang3.RandomUtils; import org.junit.Assert; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Timeout; import org.junit.rules.TemporaryFolder; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; @@ -61,10 +65,12 @@ import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; +import org.rocksdb.FlushOptions; import org.rocksdb.InfoLogLevel; import org.rocksdb.util.SizeUnit; import java.io.File; +import java.nio.file.Files; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -367,6 +373,57 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } + @Test + @Timeout(value = 60) + public void testCleanRelocatedDbLogs() throws Exception { + final File folder = tempFolder.newFolder(); + final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); + final File logFile = new File(relocatedDBLogDir, "taskManager.log"); + Files.createFile(logFile.toPath()); + System.setProperty("log.file", logFile.getAbsolutePath()); + + Configuration conf = new Configuration(); + conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); + conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); + conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); + final EmbeddedRocksDBStateBackend rocksDbBackend = + new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); + final String dbStoragePath = new Path(folder.toURI().toString()).toString(); + rocksDbBackend.setDbStoragePath(dbStoragePath); + + final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); + RocksDBKeyedStateBackend keyedBackend = + createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + + File instanceBasePath = keyedBackend.getInstanceBasePath(); + File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + + // avoid tests without relocate. + Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + + java.nio.file.Path[] relocatedDbLogs; + try { + relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + while (relocatedDbLogs.length <= 2) { + // If the default number of log files in rocksdb is not enough, add more logs. + try (FlushOptions flushOptions = new FlushOptions()) { + keyedBackend.db.put(RandomUtils.nextBytes(32), RandomUtils.nextBytes(512)); + keyedBackend.db.flush(flushOptions); + } + relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + } + } finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + env.close(); + } + + relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + assertEquals(1, relocatedDbLogs.length); + assertEquals("taskManager.log", relocatedDbLogs[0].toFile().getName()); + } + // ------------------------------------------------------------------------ // RocksDB local file automatic from temp directories // ------------------------------------------------------------------------