Skip to content

Commit

Permalink
[FLINK-17547][task][hotfix] Extract RefCountedFileWithStream from Ref…
Browse files Browse the repository at this point in the history
…CountedFile

Motivation: use RefCountedFile for reading as well.
  • Loading branch information
rkhachatryan authored and pnowojski committed May 19, 2020
1 parent 37f441a commit 179de29
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
Expand Down Expand Up @@ -57,7 +57,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject

private final String localTmpDir;

private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator;

@Nullable
private final S3AccessHelper s3AccessHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link RefCountedFile} that also uses an in-memory buffer for buffering small writes.
* A {@link RefCountedFileWithStream} that also uses an in-memory buffer for buffering small writes.
* This is done to avoid frequent 'flushes' of the file stream to disk.
*/
@Internal
public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {

public static final int BUFFER_SIZE = 4096;

private final RefCountedFile currentTmpFile;
private final RefCountedFileWithStream currentTmpFile;

/** The write buffer. */
private final byte[] buffer;
Expand All @@ -49,7 +49,7 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {

@VisibleForTesting
public RefCountedBufferingFileStream(
final RefCountedFile file,
final RefCountedFileWithStream file,
final int bufferSize) {

checkArgument(bufferSize > 0L);
Expand Down Expand Up @@ -165,15 +165,15 @@ public int getReferenceCounter() {
// ------------------------- Factory Methods -------------------------

public static RefCountedBufferingFileStream openNew(
final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider) throws IOException {
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider) throws IOException {

return new RefCountedBufferingFileStream(
tmpFileProvider.apply(null),
BUFFER_SIZE);
}

public static RefCountedBufferingFileStream restore(
final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider,
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider,
final File initialTmpFile) throws IOException {

return new RefCountedBufferingFileStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.RefCounted;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -40,55 +39,20 @@ public class RefCountedFile implements RefCounted {

private final File file;

private final OffsetAwareOutputStream stream;

private final AtomicInteger references;

private boolean closed;
protected boolean closed;

private RefCountedFile(
final File file,
final OutputStream currentOut,
final long bytesInCurrentPart) {
protected RefCountedFile(final File file) {
this.file = checkNotNull(file);
this.references = new AtomicInteger(1);
this.stream = new OffsetAwareOutputStream(
currentOut,
bytesInCurrentPart);
this.closed = false;
}

public File getFile() {
return file;
}

public OffsetAwareOutputStream getStream() {
return stream;
}

public long getLength() {
return stream.getLength();
}

public void write(byte[] b, int off, int len) throws IOException {
requireOpened();
if (len > 0) {
stream.write(b, off, len);
}
}

public void flush() throws IOException {
requireOpened();
stream.flush();
}

public void closeStream() {
if (!closed) {
IOUtils.closeQuietly(stream);
closed = true;
}
}

@Override
public void retain() {
references.incrementAndGet();
Expand Down Expand Up @@ -119,22 +83,7 @@ private void requireOpened() throws IOException {
}

@VisibleForTesting
int getReferenceCounter() {
public int getReferenceCounter() {
return references.get();
}

// ------------------------------ Factory methods for initializing a temporary file ------------------------------

public static RefCountedFile newFile(
final File file,
final OutputStream currentOut) throws IOException {
return new RefCountedFile(file, currentOut, 0L);
}

public static RefCountedFile restoredFile(
final File file,
final OutputStream currentOut,
final long bytesInCurrentPart) {
return new RefCountedFile(file, currentOut, bytesInCurrentPart);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3.common.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;

/**
* A reference counted file which is deleted as soon as no caller
* holds a reference to the wrapped {@link File}.
*/
@Internal
public class RefCountedFileWithStream extends RefCountedFile {

private final OffsetAwareOutputStream stream;

private RefCountedFileWithStream(
final File file,
final OutputStream currentOut,
final long bytesInCurrentPart) {
super(file);
this.stream = new OffsetAwareOutputStream(currentOut, bytesInCurrentPart);
}

public OffsetAwareOutputStream getStream() {
return stream;
}

public long getLength() {
return stream.getLength();
}

public void write(byte[] b, int off, int len) throws IOException {
requireOpened();
if (len > 0) {
stream.write(b, off, len);
}
}

void flush() throws IOException {
requireOpened();
stream.flush();
}

void closeStream() {
if (!closed) {
IOUtils.closeQuietly(stream);
closed = true;
}
}

private void requireOpened() throws IOException {
if (closed) {
throw new IOException("Stream closed.");
}
}

// ------------------------------ Factory methods for initializing a temporary file ------------------------------

public static RefCountedFileWithStream newFile(
final File file,
final OutputStream currentOut) throws IOException {
return new RefCountedFileWithStream(file, currentOut, 0L);
}

public static RefCountedFileWithStream restoredFile(
final File file,
final OutputStream currentOut,
final long bytesInCurrentPart) {
return new RefCountedFileWithStream(file, currentOut, bytesInCurrentPart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* A utility class that creates local {@link RefCountedFile reference counted files} that serve as temporary files.
* A utility class that creates local {@link RefCountedFileWithStream reference counted files} that serve as temporary files.
*/
@Internal
public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFile, IOException> {
public class RefCountedTmpFileCreator implements FunctionWithException<File, RefCountedFileWithStream, IOException> {

private final File[] tempDirectories;

Expand Down Expand Up @@ -70,18 +70,18 @@ private RefCountedTmpFileCreator(File... tempDirectories) {
* @throws IOException Thrown, if the stream to the temp file could not be opened.
*/
@Override
public RefCountedFile apply(File file) throws IOException {
public RefCountedFileWithStream apply(File file) throws IOException {
final File directory = tempDirectories[nextIndex()];

while (true) {
try {
if (file == null) {
final File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
final OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
return RefCountedFile.newFile(newFile, out);
return RefCountedFileWithStream.newFile(newFile, out);
} else {
final OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
return RefCountedFile.restoredFile(file, out, file.length());
return RefCountedFileWithStream.restoredFile(file, out, file.length());
}
} catch (FileAlreadyExistsException ignored) {
// fall through the loop and retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
import org.apache.flink.util.function.FunctionWithException;

import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -60,7 +60,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp

private final RecoverableMultiPartUpload upload;

private final FunctionWithException<File, RefCountedFile, IOException> tmpFileProvider;
private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileProvider;

/**
* The number of bytes at which we start a new part of the multipart upload.
Expand All @@ -80,7 +80,7 @@ public final class S3RecoverableFsDataOutputStream extends RecoverableFsDataOutp
*/
S3RecoverableFsDataOutputStream(
RecoverableMultiPartUpload upload,
FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
FunctionWithException<File, RefCountedFileWithStream, IOException> tempFileCreator,
RefCountedFSOutputStream initialTmpFile,
long userDefinedMinPartSize,
long bytesBeforeCurrentPart) {
Expand Down Expand Up @@ -228,7 +228,7 @@ private void unlock() {

public static S3RecoverableFsDataOutputStream newStream(
final RecoverableMultiPartUpload upload,
final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
final long userDefinedMinPartSize) throws IOException {

checkArgument(userDefinedMinPartSize >= S3_MULTIPART_MIN_PART_SIZE);
Expand All @@ -245,7 +245,7 @@ public static S3RecoverableFsDataOutputStream newStream(

public static S3RecoverableFsDataOutputStream recoverStream(
final RecoverableMultiPartUpload upload,
final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
final long userDefinedMinPartSize,
final long bytesBeforeCurrentPart) throws IOException {

Expand All @@ -264,7 +264,7 @@ public static S3RecoverableFsDataOutputStream recoverStream(
}

private static RefCountedBufferingFileStream boundedBufferingFileStream(
final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator,
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator,
final Optional<File> incompletePart) throws IOException {

if (!incompletePart.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
Expand All @@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {

private final S3AccessHelper s3AccessHelper;

private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier;

private final int maxConcurrentUploadsPerStream;

Expand All @@ -54,7 +54,7 @@ final class S3RecoverableMultipartUploadFactory {
final S3AccessHelper s3AccessHelper,
final int maxConcurrentUploadsPerStream,
final Executor executor,
final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileSupplier) {

this.fs = Preconditions.checkNotNull(fs);
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
Expand Down Expand Up @@ -92,7 +92,7 @@ private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws I
}

// download the file (simple way)
final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
final RefCountedFileWithStream refCountedFile = tmpFileSupplier.apply(null);
final File file = refCountedFile.getFile();
final long numBytes = s3AccessHelper.getObject(objectKey, file);

Expand Down
Loading

0 comments on commit 179de29

Please sign in to comment.