diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 45df7283ade9e..1455c1bbebe91 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.LongFunctionWithException; @@ -80,6 +81,10 @@ public static RocksDB openDB( } catch (RocksDBException e) { IOUtils.closeQuietly(columnFamilyOptions); columnFamilyDescriptors.forEach((cfd) -> IOUtils.closeQuietly(cfd.getOptions())); + + // improve error reporting on Windows + throwExceptionIfPathLengthExceededOnWindows(path, e); + throw new IOException("Error while opening RocksDB instance.", e); } @@ -207,4 +212,16 @@ public static OpaqueMemoryResource allocateSharedCachesI throw new IOException("Failed to acquire shared cache resource for RocksDB", e); } } + + private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exception cause) throws IOException { + // max directory path length on Windows is 247. + // the maximum path length is 260, subtracting one file name length (12 chars) and one NULL terminator. + final int maxWinDirPathLen = 247; + + if (path.length() > maxWinDirPathLen && OperatingSystem.isWindows()) { + throw new IOException(String.format( + "The directory path length (%d) is longer than the directory path length limit for Windows (%d): %s", + path.length(), maxWinDirPathLen, path), cause); + } + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java new file mode 100644 index 0000000000000..ac0a68e88f865 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.OperatingSystem; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.RocksDB; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThat; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for the {@link RocksDBOperationUtils}. + */ +public class RocksDBOperationsUtilsTest { + + @ClassRule + public static final TemporaryFolder TMP_DIR = new TemporaryFolder(); + + @BeforeClass + public static void loadRocksLibrary() throws Exception { + NativeLibraryLoader.getInstance().loadLibrary(TMP_DIR.newFolder().getAbsolutePath()); + } + + @Test + public void testPathExceptionOnWindows() throws Exception { + assumeTrue(OperatingSystem.isWindows()); + + final File folder = TMP_DIR.newFolder(); + final File rocksDir = new File(folder, getLongString(247 - folder.getAbsolutePath().length())); + + Files.createDirectories(rocksDir.toPath()); + + try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true); + ColumnFamilyOptions colOptions = new ColumnFamilyOptions()) { + + RocksDB rocks = RocksDBOperationUtils.openDB( + rocksDir.getAbsolutePath(), + Collections.emptyList(), + Collections.emptyList(), + colOptions, dbOptions); + rocks.close(); + + // do not provoke a test failure if this passes, because some setups may actually + // support long paths, in which case: great! + } + catch (IOException e) { + assertThat(e.getMessage(), containsString("longer than the directory path length limit for Windows")); + } + } + + private static String getLongString(int numChars) { + final StringBuilder builder = new StringBuilder(); + for (int i = numChars; i > 0; --i) { + builder.append('a'); + } + return builder.toString(); + } +}