Skip to content

Commit

Permalink
[FLINK-6769] Replace usage of deprecated FileSystem#create(Path, bool…
Browse files Browse the repository at this point in the history
…ean)

This closes apache#4116.
  • Loading branch information
zentol committed Jun 23, 2017
1 parent 3424924 commit d7bf7ee
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private static class FileCreator extends CheckedThread {
@Override
public void go() throws Exception {
fs.initOutPathLocalFS(path.getParent(), WriteMode.OVERWRITE, true);
try (FSDataOutputStream out = fs.create(path, true)) {
try (FSDataOutputStream out = fs.create(path, WriteMode.OVERWRITE)) {
out.write(11);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void go() throws Exception {

Path tmp = new Path(tmpFolder.newFolder().toURI().toString(), "test_file");

try (FSDataOutputStream stream = fs1.create(tmp, false)) {
try (FSDataOutputStream stream = fs1.create(tmp, FileSystem.WriteMode.NO_OVERWRITE)) {
CheckedThread t2 = new CheckedThread() {
@Override
public void go() {
Expand Down Expand Up @@ -285,4 +285,4 @@ public void close() throws IOException {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
FSDataOutputStream outputStream = null;
FSDataInputStream inputStream = null;
try {
outputStream = targetFs.create(outPath, true);
outputStream = targetFs.create(outPath, FileSystem.WriteMode.OVERWRITE);
inputStream = task.getPath().getFileSystem().open(task.getPath());
int bytes = IOUtils.copy(inputStream, outputStream);
bytesCounter.add(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void testDeletePathIfEmpty() throws IOException {
byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);

for (Path file: Arrays.asList(singleFile, directoryFile)) {
org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true);
org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, FileSystem.WriteMode.OVERWRITE);
outputStream.write(data);
outputStream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void put(File localFile, JobID jobId, String key) throws IOException {
}

private void put(File fromFile, String toBlobPath) throws IOException {
try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) {
try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
Files.copy(fromFile, os);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public static void copy(Path sourcePath, Path targetPath, boolean executable) th
copy(content.getPath(), new Path(localPath), executable);
}
} else {
try (FSDataOutputStream lfsOutput = tFS.create(targetPath, false); FSDataInputStream fsInput = sFS.open(sourcePath)) {
try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
IOUtils.copyBytes(fsInput, lfsOutput);
//noinspection ResultOfMethodCallIgnored
new File(targetPath.toString()).setExecutable(executable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private void createStream() throws IOException {
for (int attempt = 0; attempt < 10; attempt++) {
try {
statePath = createStatePath();
outStream = fs.create(statePath, false);
outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
break;
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public RetrievableStateHandle<T> store(T state) throws Exception {
for (int attempt = 0; attempt < 10; attempt++) {
Path filePath = getNewFilePath();

try (FSDataOutputStream outStream = fs.create(filePath, false)) {
try (FSDataOutputStream outStream = fs.create(filePath, FileSystem.WriteMode.NO_OVERWRITE)) {
InstantiationUtil.serializeObject(outStream, state);
return new RetrievableStreamStateHandle<T>(filePath, outStream.getPos());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testSavepointMigrationV0ToV1() throws Exception {
}

try {
fdos = fs.create(path, false);
fdos = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
break;
} catch (Exception e) {
latestException = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testStoreLoadDispose() throws Exception {
public void testUnexpectedSavepoint() throws Exception {
// Random file
Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, FileSystem.WriteMode.NO_OVERWRITE);
DataOutputStream dos = new DataOutputStream(fdos);
for (int i = 0; i < 10; i++) {
dos.writeLong(ThreadLocalRandom.current().nextLong());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testCleanupWhenClosingStream() throws IOException {

final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);

when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);

CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
TEMP_DIR_PATH,
Expand All @@ -141,7 +141,7 @@ public void testCleanupWhenClosingStream() throws IOException {
// this should create the underlying file stream
stream.write(new byte[]{1,2,3,4,5});

verify(fs).create(any(Path.class), anyBoolean());
verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));

stream.close();

Expand All @@ -158,7 +158,7 @@ public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {

final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);

when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
doThrow(new IOException("Test IOException.")).when(outputStream).close();

CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Expand All @@ -170,7 +170,7 @@ public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
// this should create the underlying file stream
stream.write(new byte[]{1,2,3,4,5});

verify(fs).create(any(Path.class), anyBoolean());
verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));

try {
stream.closeAndGetHandle();
Expand Down

0 comments on commit d7bf7ee

Please sign in to comment.