Skip to content

Commit

Permalink
[FLINK-11187] [s3] Use file over stream for writes
Browse files Browse the repository at this point in the history
This changes the S3AccessHelper API to take a file instead of an input
stream.

This allows s3 client to properly reset a file instead of a file over
stream for writes.

This fixes an issue where the underlying s3 implementation has an
intermittent failure, tries to reset the stream, fails to do so, and
results in hung requests with delayed errors.
  • Loading branch information
Addison Higham authored and aljoscha committed Jan 16, 2019
1 parent a1c64b9 commit 9c07822
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -64,8 +61,8 @@ public RefCountedBufferingFileStream(
}

@Override
public InputStream getInputStream() throws IOException {
return Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ);
public File getInputFile() {
return currentTmpFile.getFile();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;

/**
* A {@link FSDataOutputStream} with the {@link RefCounted} functionality.
Expand All @@ -31,11 +31,11 @@
public abstract class RefCountedFSOutputStream extends FSDataOutputStream implements RefCounted {

/**
* Gets an {@link InputStream} that allows to read the contents of the file.
* Gets the underlying {@link File} that allows to read the contents of the file.
*
* @return An input stream to the contents of the file.
* @return A handle to the File object.
*/
public abstract InputStream getInputStream() throws IOException;
public abstract File getInputFile();

/**
* Checks if the file is closed for writes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
Expand Down Expand Up @@ -173,8 +172,8 @@ private String safelyUploadSmallPart(@Nullable RefCountedFSOutputStream file) th
// first, upload the trailing data file. during that time, other in-progress uploads may complete.
final String incompletePartObjectName = createIncompletePartObjectName();
file.retain();
try (InputStream inputStream = file.getInputStream()) {
s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
try {
s3AccessHelper.putObject(incompletePartObjectName, file.getInputFile());
}
finally {
file.release();
Expand Down Expand Up @@ -315,8 +314,8 @@ private static class UploadTask implements Runnable {

@Override
public void run() {
try (final InputStream inputStream = file.getInputStream()) {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
try {
final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -59,25 +58,24 @@ public interface S3AccessHelper {
* @param key the key this MPU is associated with.
* @param uploadId the id of the MPU.
* @param partNumber the number of the part being uploaded (has to be in [1 ... 10000]).
* @param file the (local) file holding the part to be uploaded.
* @param inputFile the (local) file holding the part to be uploaded.
* @param length the length of the part.
* @return The {@link UploadPartResult result} of the attempt to upload the part.
* @throws IOException
*/
UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException;

/**
* Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
* Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, File, long)} method,
* this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
* policies specified for your S3 bucket.
*
* @param key the key used to identify this part.
* @param file the (local) file holding the data to be uploaded.
* @param length the size of the data to be uploaded.
* @param inputFile the (local) file holding the data to be uploaded.
* @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
* @throws IOException
*/
PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
PutObjectResult putObject(String key, File inputFile) throws IOException;

/**
* Finalizes a Multi-Part Upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testFlush() throws IOException {
Assert.assertEquals(contentToWrite.length, stream.getPos());

final byte[] contentRead = new byte[contentToWrite.length];
stream.getInputStream().read(contentRead, 0, contentRead.length);
new FileInputStream(stream.getInputFile()).read(contentRead, 0, contentRead.length);
Assert.assertTrue(Arrays.equals(contentToWrite, contentRead));

stream.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -361,14 +361,14 @@ public String startMultiPartUpload(String key) throws IOException {
}

@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException {
final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length));
return storeAndGetUploadPartResult(key, partNumber, content);
}

@Override
public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
public PutObjectResult putObject(String key, File inputFile) throws IOException {
final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(inputFile.length()));
return storeAndGetPutObjectResult(key, content);
}

Expand Down Expand Up @@ -397,9 +397,9 @@ public ObjectMetadata getObjectMetadata(String key) throws IOException {
return null;
}

private byte[] getFileContentBytes(InputStream file, int length) throws IOException {
private byte[] getFileContentBytes(File file, int length) throws IOException {
final byte[] content = new byte[length];
file.read(content, 0, length);
new FileInputStream(file).read(content, 0, length);
return content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
Expand Down Expand Up @@ -314,10 +314,9 @@ private static byte[] bytesOf(String str) {

private static byte[] readFileContents(RefCountedFSOutputStream file) throws IOException {
final byte[] content = new byte[MathUtils.checkedDownCast(file.getPos())];
try (InputStream inputStream = file.getInputStream()) {
int bytesRead = inputStream.read(content, 0, content.length); // TODO: 10/2/18 see if closed in download
Assert.assertEquals(file.getPos(), bytesRead);
}
File inputFile = file.getInputFile();
long bytesRead = new FileInputStream(inputFile).read(content, 0, MathUtils.checkedDownCast(inputFile.length()));
Assert.assertEquals(file.getPos(), bytesRead);
return content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -68,15 +67,15 @@ public String startMultiPartUpload(String key) throws IOException {
}

@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L);
return s3accessHelper.uploadPart(uploadRequest);
}

@Override
public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
public PutObjectResult putObject(String key, File inputFile) throws IOException {
final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputFile);
return s3accessHelper.putObject(putRequest);
}

Expand Down

0 comments on commit 9c07822

Please sign in to comment.