Skip to content

Commit

Permalink
[FLINK-24785][runtime] Relocate RocksDB's log under flink log directo…
Browse files Browse the repository at this point in the history
…ry by default

This closes apache#17833.
  • Loading branch information
SteNicholas authored and Myasuka committed Dec 28, 2021
1 parent 509530b commit cdef097
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<td><h5>state.backend.rocksdb.log.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The directory for RocksDB's information logging files. If empty (Flink default setting), log files will be in the same directory as data files. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name.</td>
<td>The directory for RocksDB's information logging files. If empty (Flink default setting), log files will be in the same directory as the Flink log. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.log.file-num</h5></td>
Expand Down
1 change: 1 addition & 0 deletions flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ function check_logs_for_errors {
| grep -v "error_prone_annotations" \
| grep -v "Error sending fetch request" \
| grep -v "WARN akka.remote.ReliableDeliverySupervisor" \
| grep -v "Options.*error_*" \
| grep -ic "error" || true)
if [[ ${error_count} -gt 0 ]]; then
echo "Found error in log files; printing first 500 lines; see full logs for details:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class RocksDBConfigurableOptions implements Serializable {
.noDefaultValue()
.withDescription(
"The directory for RocksDB's information logging files. "
+ "If empty (Flink default setting), log files will be in the same directory as data files. "
+ "If empty (Flink default setting), log files will be in the same directory as the Flink log. "
+ "If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name.");

public static final ConfigOption<InfoLogLevel> LOG_LEVEL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import javax.annotation.Nullable;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;

Expand Down Expand Up @@ -297,7 +298,9 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions)
currentOptions.setInfoLogLevel(internalGetOption(RocksDBConfigurableOptions.LOG_LEVEL));

String logDir = internalGetOption(RocksDBConfigurableOptions.LOG_DIR);
if (logDir != null && !logDir.isEmpty()) {
if (logDir == null || logDir.isEmpty()) {
relocateDefaultDbLogDir(currentOptions);
} else {
currentOptions.setDbLogDir(logDir);
}

Expand Down Expand Up @@ -371,4 +374,31 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(

return currentOptions.setTableFormatConfig(blockBasedTableConfig);
}

/**
* Relocates the default log directory of RocksDB with the Flink log directory. Finds the Flink
* log directory using log.file Java property that is set during startup.
*
* @param dbOptions The RocksDB {@link DBOptions}.
*/
private void relocateDefaultDbLogDir(DBOptions dbOptions) {
String logFilePath = System.getProperty("log.file");
if (logFilePath != null) {
File logFile = resolveFileLocation(logFilePath);
if (logFile != null && resolveFileLocation(logFile.getParent()) != null) {
dbOptions.setDbLogDir(logFile.getParent());
}
}
}

/**
* Verify log file location.
*
* @param logFilePath Path to log file
* @return File or null if not a valid log file
*/
private File resolveFileLocation(String logFilePath) {
File logFile = new File(logFilePath);
return (logFile.exists() && logFile.canRead()) ? logFile : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ public void testDefaultsInSync() throws Exception {
assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
}

@Test
public void testDefaultDbLogDir() throws Exception {
final EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
final File logFile = File.createTempFile(getClass().getSimpleName() + "-", ".log");
// set the environment variable 'log.file' with the Flink log file location
System.setProperty("log.file", logFile.getPath());
try (RocksDBResourceContainer container = backend.createOptionsAndResourceContainer()) {
assertEquals(logFile.getParent(), container.getDbOptions().dbLogDir());
} finally {
logFile.delete();
}
}

// ------------------------------------------------------------------------
// RocksDB local file directory
// ------------------------------------------------------------------------
Expand Down

0 comments on commit cdef097

Please sign in to comment.