diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index 5248e061a12ec..3514bbcb4e36a 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -21,7 +21,7 @@ import org.apache.flink.core.fs.EntropyInjectingFileSystem; import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter; @@ -57,7 +57,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject private final String localTmpDir; - private final FunctionWithException tmpFileCreator; + private final FunctionWithException tmpFileCreator; @Nullable private final S3AccessHelper s3AccessHelper; diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java index 29f2590803cc8..5f149df6c87c1 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java @@ -29,7 +29,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link RefCountedFile} that also uses an in-memory buffer for buffering small writes. + * A {@link RefCountedFileWithStream} that also uses an in-memory buffer for buffering small writes. * This is done to avoid frequent 'flushes' of the file stream to disk. */ @Internal @@ -37,7 +37,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { public static final int BUFFER_SIZE = 4096; - private final RefCountedFile currentTmpFile; + private final RefCountedFileWithStream currentTmpFile; /** The write buffer. */ private final byte[] buffer; @@ -49,7 +49,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream { @VisibleForTesting public RefCountedBufferingFileStream( - final RefCountedFile file, + final RefCountedFileWithStream file, final int bufferSize) { checkArgument(bufferSize > 0L); @@ -165,7 +165,7 @@ public int getReferenceCounter() { // ------------------------- Factory Methods ------------------------- public static RefCountedBufferingFileStream openNew( - final FunctionWithException tmpFileProvider) throws IOException { + final FunctionWithException tmpFileProvider) throws IOException { return new RefCountedBufferingFileStream( tmpFileProvider.apply(null), @@ -173,7 +173,7 @@ public static RefCountedBufferingFileStream openNew( } public static RefCountedBufferingFileStream restore( - final FunctionWithException tmpFileProvider, + final FunctionWithException tmpFileProvider, final File initialTmpFile) throws IOException { return new RefCountedBufferingFileStream( diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java index 178763631e1bd..9675f09a1fa91 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFile.java @@ -21,11 +21,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.IOUtils; +import org.apache.flink.util.RefCounted; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.file.Files; import java.util.concurrent.atomic.AtomicInteger; @@ -40,21 +39,13 @@ public class RefCountedFile implements RefCounted { private final File file; - private final OffsetAwareOutputStream stream; - private final AtomicInteger references; - private boolean closed; + protected boolean closed; - private RefCountedFile( - final File file, - final OutputStream currentOut, - final long bytesInCurrentPart) { + protected RefCountedFile(final File file) { this.file = checkNotNull(file); this.references = new AtomicInteger(1); - this.stream = new OffsetAwareOutputStream( - currentOut, - bytesInCurrentPart); this.closed = false; } @@ -62,33 +53,6 @@ public File getFile() { return file; } - public OffsetAwareOutputStream getStream() { - return stream; - } - - public long getLength() { - return stream.getLength(); - } - - public void write(byte[] b, int off, int len) throws IOException { - requireOpened(); - if (len > 0) { - stream.write(b, off, len); - } - } - - public void flush() throws IOException { - requireOpened(); - stream.flush(); - } - - public void closeStream() { - if (!closed) { - IOUtils.closeQuietly(stream); - closed = true; - } - } - @Override public void retain() { references.incrementAndGet(); @@ -119,22 +83,7 @@ private void requireOpened() throws IOException { } @VisibleForTesting - int getReferenceCounter() { + public int getReferenceCounter() { return references.get(); } - - // ------------------------------ Factory methods for initializing a temporary file ------------------------------ - - public static RefCountedFile newFile( - final File file, - final OutputStream currentOut) throws IOException { - return new RefCountedFile(file, currentOut, 0L); - } - - public static RefCountedFile restoredFile( - final File file, - final OutputStream currentOut, - final long bytesInCurrentPart) { - return new RefCountedFile(file, currentOut, bytesInCurrentPart); - } } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java new file mode 100644 index 0000000000000..94b8527adcf65 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.IOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; + +/** + * A reference counted file which is deleted as soon as no caller + * holds a reference to the wrapped {@link File}. + */ +@Internal +public class RefCountedFileWithStream extends RefCountedFile { + + private final OffsetAwareOutputStream stream; + + private RefCountedFileWithStream( + final File file, + final OutputStream currentOut, + final long bytesInCurrentPart) { + super(file); + this.stream = new OffsetAwareOutputStream(currentOut, bytesInCurrentPart); + } + + public OffsetAwareOutputStream getStream() { + return stream; + } + + public long getLength() { + return stream.getLength(); + } + + public void write(byte[] b, int off, int len) throws IOException { + requireOpened(); + if (len > 0) { + stream.write(b, off, len); + } + } + + void flush() throws IOException { + requireOpened(); + stream.flush(); + } + + void closeStream() { + if (!closed) { + IOUtils.closeQuietly(stream); + closed = true; + } + } + + private void requireOpened() throws IOException { + if (closed) { + throw new IOException("Stream closed."); + } + } + + // ------------------------------ Factory methods for initializing a temporary file ------------------------------ + + public static RefCountedFileWithStream newFile( + final File file, + final OutputStream currentOut) throws IOException { + return new RefCountedFileWithStream(file, currentOut, 0L); + } + + public static RefCountedFileWithStream restoredFile( + final File file, + final OutputStream currentOut, + final long bytesInCurrentPart) { + return new RefCountedFileWithStream(file, currentOut, bytesInCurrentPart); + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java index 7a928d0c99aa5..51b417c42ccbd 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java @@ -34,10 +34,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; /** - * A utility class that creates local {@link RefCountedFile reference counted files} that serve as temporary files. + * A utility class that creates local {@link RefCountedFileWithStream reference counted files} that serve as temporary files. */ @Internal -public class RefCountedTmpFileCreator implements FunctionWithException { +public class RefCountedTmpFileCreator implements FunctionWithException { private final File[] tempDirectories; @@ -70,7 +70,7 @@ private RefCountedTmpFileCreator(File... tempDirectories) { * @throws IOException Thrown, if the stream to the temp file could not be opened. */ @Override - public RefCountedFile apply(File file) throws IOException { + public RefCountedFileWithStream apply(File file) throws IOException { final File directory = tempDirectories[nextIndex()]; while (true) { @@ -78,10 +78,10 @@ public RefCountedFile apply(File file) throws IOException { if (file == null) { final File newFile = new File(directory, ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - return RefCountedFile.newFile(newFile, out); + return RefCountedFileWithStream.newFile(newFile, out); } else { final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND); - return RefCountedFile.restoredFile(file, out, file.length()); + return RefCountedFileWithStream.restoredFile(file, out, file.length()); } } catch (FileAlreadyExistsException ignored) { // fall through the loop and retry diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java index 220ddd58eb989..5447026be94ba 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java @@ -23,7 +23,7 @@ import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.function.FunctionWithException; import org.apache.commons.io.IOUtils; @@ -60,7 +60,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp private final RecoverableMultiPartUpload upload; - private final FunctionWithException tmpFileProvider; + private final FunctionWithException tmpFileProvider; /** * The number of bytes at which we start a new part of the multipart upload. @@ -80,7 +80,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp */ S3RecoverableFsDataOutputStream( RecoverableMultiPartUpload upload, - FunctionWithException tempFileCreator, + FunctionWithException tempFileCreator, RefCountedFSOutputStream initialTmpFile, long userDefinedMinPartSize, long bytesBeforeCurrentPart) { @@ -228,7 +228,7 @@ private void unlock() { public static S3RecoverableFsDataOutputStream newStream( final RecoverableMultiPartUpload upload, - final FunctionWithException tmpFileCreator, + final FunctionWithException tmpFileCreator, final long userDefinedMinPartSize) throws IOException { checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE); @@ -245,7 +245,7 @@ public static S3RecoverableFsDataOutputStream newStream( public static S3RecoverableFsDataOutputStream recoverStream( final RecoverableMultiPartUpload upload, - final FunctionWithException tmpFileCreator, + final FunctionWithException tmpFileCreator, final long userDefinedMinPartSize, final long bytesBeforeCurrentPart) throws IOException { @@ -264,7 +264,7 @@ public static S3RecoverableFsDataOutputStream recoverStream( } private static RefCountedBufferingFileStream boundedBufferingFileStream( - final FunctionWithException tmpFileCreator, + final FunctionWithException tmpFileCreator, final Optional incompletePart) throws IOException { if (!incompletePart.isPresent()) { diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java index 3727e25790437..b7fb8fb9bdbd6 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory { private final S3AccessHelper s3AccessHelper; - private final FunctionWithException tmpFileSupplier; + private final FunctionWithException tmpFileSupplier; private final int maxConcurrentUploadsPerStream; @@ -54,7 +54,7 @@ final class S3RecoverableMultipartUploadFactory { final S3AccessHelper s3AccessHelper, final int maxConcurrentUploadsPerStream, final Executor executor, - final FunctionWithException tmpFileSupplier) { + final FunctionWithException tmpFileSupplier) { this.fs = Preconditions.checkNotNull(fs); this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; @@ -92,7 +92,7 @@ private Optional recoverInProgressPart(S3Recoverable recoverable) throws I } // download the file (simple way) - final RefCountedFile refCountedFile = tmpFileSupplier.apply(null); + final RefCountedFileWithStream refCountedFile = tmpFileSupplier.apply(null); final File file = refCountedFile.getFile(); final long numBytes = s3AccessHelper.getObject(objectKey, file); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java index ddb4443c58564..a6b62cca304c8 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java @@ -25,7 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.function.FunctionWithException; import org.apache.hadoop.fs.FileSystem; @@ -50,7 +50,7 @@ @PublicEvolving public class S3RecoverableWriter implements RecoverableWriter { - private final FunctionWithException tempFileCreator; + private final FunctionWithException tempFileCreator; private final long userDefinedMinPartSize; @@ -62,7 +62,7 @@ public class S3RecoverableWriter implements RecoverableWriter { S3RecoverableWriter( final S3AccessHelper s3AccessHelper, final S3RecoverableMultipartUploadFactory uploadFactory, - final FunctionWithException tempFileCreator, + final FunctionWithException tempFileCreator, final long userDefinedMinPartSize) { this.s3AccessHelper = checkNotNull(s3AccessHelper); @@ -144,7 +144,7 @@ private static S3Recoverable castToS3Recoverable(CommitRecoverable recoverable) public static S3RecoverableWriter writer( final FileSystem fs, - final FunctionWithException tempFileCreator, + final FunctionWithException tempFileCreator, final S3AccessHelper s3AccessHelper, final Executor uploadThreadPool, final long userDefinedMinPartSize, diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java index 50ea9bd64a6e1..368c9cfe0ff47 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java @@ -134,11 +134,11 @@ private RefCountedBufferingFileStream getStreamToTest() throws IOException { return new RefCountedBufferingFileStream(getRefCountedFileWithContent(), BUFFER_SIZE); } - private RefCountedFile getRefCountedFileWithContent() throws IOException { + private RefCountedFileWithStream getRefCountedFileWithContent() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - return RefCountedFile.newFile(newFile, out); + return RefCountedFileWithStream.newFile(newFile, out); } private static byte[] bytesOf(String str) { diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java index 2e03197142a20..217f4e163ecd0 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java @@ -25,14 +25,14 @@ import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.UUID; import java.util.stream.Stream; +import static org.apache.flink.util.Preconditions.checkState; + /** * Tests for the {@link RefCountedFile}. */ @@ -44,9 +44,9 @@ public class RefCountedFileTest { @Test public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); + checkState(newFile.createNewFile()); - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); + RefCountedFile fileUnderTest = new RefCountedFile(newFile); verifyTheFileIsStillThere(); fileUnderTest.release(); @@ -59,10 +59,10 @@ public void releaseToZeroRefCounterShouldDeleteTheFile() throws IOException { @Test public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOException { final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); + checkState(newFile.createNewFile()); // the reference counter always starts with 1 (not 0). This is why we need +1 releases - RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); + RefCountedFile fileUnderTest = new RefCountedFile(newFile); verifyTheFileIsStillThere(); fileUnderTest.retain(); @@ -85,59 +85,12 @@ public void retainsShouldRequirePlusOneReleasesToDeleteTheFile() throws IOExcept } } - @Test - public void writeShouldSucceed() throws IOException { - byte[] content = bytesOf("hello world"); - - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent(content); - long fileLength = fileUnderTest.getLength(); - - Assert.assertEquals(content.length, fileLength); - } - - @Test - public void closeShouldNotReleaseReference() throws IOException { - getClosedRefCountedFileWithContent("hello world"); - verifyTheFileIsStillThere(); - } - - @Test(expected = IOException.class) - public void writeAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); - byte[] content = bytesOf("Hello Again"); - fileUnderTest.write(content, 0, content.length); - } - - @Test(expected = IOException.class) - public void flushAfterCloseShouldThrowException() throws IOException { - final RefCountedFile fileUnderTest = getClosedRefCountedFileWithContent("hello world"); - fileUnderTest.flush(); - } - - // ------------------------------------- Utilities ------------------------------------- - private void verifyTheFileIsStillThere() throws IOException { try (Stream files = Files.list(temporaryFolder.getRoot().toPath())) { Assert.assertEquals(1L, files.count()); } } - private RefCountedFile getClosedRefCountedFileWithContent(String content) throws IOException { - return getClosedRefCountedFileWithContent(bytesOf(content)); - } - - private RefCountedFile getClosedRefCountedFileWithContent(byte[] content) throws IOException { - final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); - final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); - - final RefCountedFile fileUnderTest = RefCountedFile.newFile(newFile, out); - - fileUnderTest.write(content, 0, content.length); - - fileUnderTest.closeStream(); - return fileUnderTest; - } - private static byte[] bytesOf(String str) { return str.getBytes(StandardCharsets.UTF_8); } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java new file mode 100644 index 0000000000000..7aa7240ff6b2f --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common.utils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.UUID; +import java.util.stream.Stream; + +/** + * Tests for the {@link RefCountedFileWithStream}. + */ +public class RefCountedFileWithStreamTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void writeShouldSucceed() throws IOException { + byte[] content = bytesOf("hello world"); + + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent(content); + long fileLength = fileUnderTest.getLength(); + + Assert.assertEquals(content.length, fileLength); + } + + @Test + public void closeShouldNotReleaseReference() throws IOException { + getClosedRefCountedFileWithContent("hello world"); + verifyTheFileIsStillThere(); + } + + @Test(expected = IOException.class) + public void writeAfterCloseShouldThrowException() throws IOException { + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world"); + byte[] content = bytesOf("Hello Again"); + fileUnderTest.write(content, 0, content.length); + } + + @Test(expected = IOException.class) + public void flushAfterCloseShouldThrowException() throws IOException { + final RefCountedFileWithStream fileUnderTest = getClosedRefCountedFileWithContent("hello world"); + fileUnderTest.flush(); + } + + // ------------------------------------- Utilities ------------------------------------- + + private void verifyTheFileIsStillThere() throws IOException { + try (Stream files = Files.list(temporaryFolder.getRoot().toPath())) { + Assert.assertEquals(1L, files.count()); + } + } + + private RefCountedFileWithStream getClosedRefCountedFileWithContent(String content) throws IOException { + return getClosedRefCountedFileWithContent(bytesOf(content)); + } + + private RefCountedFileWithStream getClosedRefCountedFileWithContent(byte[] content) throws IOException { + final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID()); + final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); + + final RefCountedFileWithStream fileUnderTest = RefCountedFileWithStream.newFile(newFile, out); + + fileUnderTest.write(content, 0, content.length); + + fileUnderTest.closeStream(); + return fileUnderTest; + } + + private static byte[] bytesOf(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } +} diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index f01da886c24ef..e8c6e9e8e74a2 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -19,7 +19,7 @@ package org.apache.flink.fs.s3.common.writer; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.IOUtils; import org.apache.flink.util.MathUtils; @@ -320,7 +320,7 @@ private RefCountedBufferingFileStream writeContent(byte[] content) throws IOExce final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); final RefCountedBufferingFileStream testStream = - new RefCountedBufferingFileStream(RefCountedFile.newFile(newFile, out), BUFFER_SIZE); + new RefCountedBufferingFileStream(RefCountedFileWithStream.newFile(newFile, out), BUFFER_SIZE); testStream.write(content, 0, content.length); return testStream; diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java index 14ed2e294f7d2..b7c94c4b30b68 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java @@ -22,7 +22,7 @@ import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream; import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream; -import org.apache.flink.fs.s3.common.utils.RefCountedFile; +import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -483,7 +483,7 @@ public String toString() { } } - private static class TestFileProvider implements FunctionWithException { + private static class TestFileProvider implements FunctionWithException { private final TemporaryFolder folder; @@ -492,16 +492,16 @@ private static class TestFileProvider implements FunctionWithException