Skip to content

Commit

Permalink
[FLINK-3655] [core] Add support for multiple file paths for FileInput…
Browse files Browse the repository at this point in the history
…Format.

- Reverted API-breaking changes.
- Enable multi-path support for the following InputFormats:
  - AvroInputFormat,
  - [Pojo,Row,Tuple]CsvInputFormat,
  - OrcInputFormat,
  - TextInputFormat,
  - TextValueInputFormat
  • Loading branch information
fhueske committed Feb 16, 2018
1 parent 632996f commit 892eb85
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}

@Override
public boolean supportsMultiPaths() {
return true;
}

// --------------------------------------------------------------------------------------------
// Getter methods for tests
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ public long getBlockSize() {

@Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<FileStatus> files = this.getFiles();

final FileSystem fs = getFilePath().getFileSystem();
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize;
final List<FileStatus> files = this.getFiles();

final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
for (FileStatus file : files) {
final FileSystem fs = file.getPath().getFileSystem();
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize;

for (long pos = 0, length = file.getLen(); pos < length; pos += blockSize) {
long remainingLength = Math.min(pos + blockSize, length) - pos;

Expand All @@ -132,10 +132,10 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {

if (inputSplits.size() < minNumSplits) {
LOG.warn(String.format(
"With the given block size %d, the file %s cannot be split into %d blocks. Filling up with empty splits...",
blockSize, getFilePath(), minNumSplits));
"With the given block size %d, the files %s cannot be split into %d blocks. Filling up with empty splits...",
blockSize, Arrays.toString(getFilePaths()), minNumSplits));
FileStatus last = files.get(files.size() - 1);
final BlockLocation[] blocks = fs.getFileBlockLocations(last, 0, last.getLen());
final BlockLocation[] blocks = last.getPath().getFileSystem().getFileBlockLocations(last, 0, last.getLen());
for (int index = files.size(); index < minNumSplits; index++) {
inputSplits.add(new FileInputSplit(index, last.getPath(), last.getLen(), 0, blocks[0].getHosts()));
}
Expand All @@ -146,9 +146,9 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {

protected List<FileStatus> getFiles() throws IOException {
// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
List<FileStatus> files = new ArrayList<>();

for (Path filePath: this.filePathList) {
for (Path filePath: getFilePaths()) {
final FileSystem fs = filePath.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(filePath);

Expand All @@ -172,10 +172,10 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) {

final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;

try {
final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);
final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles);
final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles);
if (stats == null) {
return null;
}
Expand All @@ -187,19 +187,18 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) {
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn(
String.format("Could not determine complete statistics for files in '%s' due to an I/O error",
this.filePathList),
String.format("Could not determine complete statistics for files '%s' due to an I/O error",
Arrays.toString(getFilePaths())),
ioex);
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error(
String.format("Unexpected problem while getting the file statistics for file in'%s'",
this.filePathList),
String.format("Unexpected problem while getting the file statistics for files '%s'",
Arrays.toString(getFilePaths())),
t);
}
}

// no stats available
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,33 +350,28 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
final int oldLineLengthLimit = this.lineLengthLimit;
try {

final ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>(1);
final ArrayList<FileStatus> allFiles = new ArrayList<>(1);

// let the file input format deal with the up-to-date check and the
// basic size
final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles);
// let the file input format deal with the up-to-date check and the basic size
final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles);
if (stats == null) {
return null;
}

// check whether the width per record is already known or the total
// size is unknown as well

// check whether the width per record is already known or the total size is unknown as well
// in both cases, we return the stats as they are
if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN
|| stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) {
if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN ||
stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) {
return stats;
}

// disabling sampling for unsplittable files since the logic below
// assumes splitability.
// TODO: Add sampling for unsplittable files. Right now, only
// compressed text files are affected by this limitation.
// disabling sampling for unsplittable files since the logic below assumes splitability.
// TODO: Add sampling for unsplittable files. Right now, only compressed text files are affected by this limitation.
if (unsplittable) {
return stats;
}

// compute how many samples to take, depending on the defined upper
// and lower bound

// compute how many samples to take, depending on the defined upper and lower bound
final int numSamples;
if (this.numLineSamples != NUM_SAMPLES_UNDEFINED) {
numSamples = this.numLineSamples;
Expand All @@ -385,24 +380,23 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
final int calcSamples = (int) (stats.getTotalInputSize() / 1024);
numSamples = Math.min(DEFAULT_MAX_NUM_SAMPLES, Math.max(DEFAULT_MIN_NUM_SAMPLES, calcSamples));
}

// check if sampling is disabled.
if (numSamples == 0) {
return stats;
}
if (numSamples < 0) {
throw new RuntimeException("Error: Invalid number of samples: " + numSamples);
}

// make sure that the sampling times out after a while if the file
// system does not answer in time

// make sure that the sampling times out after a while if the file system does not answer in time
this.openTimeout = 10000;
// set a small read buffer size
this.bufferSize = 4 * 1024;
// prevent overly large records, for example if we have an
// incorrectly configured delimiter
// prevent overly large records, for example if we have an incorrectly configured delimiter
this.lineLengthLimit = MAX_SAMPLE_LEN;

long offset = 0;
long totalNumBytes = 0;
long stepSize = stats.getTotalInputSize() / numSamples;
Expand Down Expand Up @@ -436,28 +430,28 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
fileNum++;
}
}

// we have the width, store it
return new FileBaseStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(),
totalNumBytes / (float) samplesTaken);

return new FileBaseStatistics(stats.getLastModificationTime(),
stats.getTotalInputSize(), totalNumBytes / (float) samplesTaken);
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics for file(s) in '" + this.filePathList
+ "' due to an io error: " + ioex.getMessage());
LOG.warn("Could not determine statistics for files '" + Arrays.toString(getFilePaths()) + "' " +
"due to an io error: " + ioex.getMessage());
}
} catch (Throwable t) {
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problen while getting the file statistics for file(s) in'" + this.filePathList
+ "': " + t.getMessage(), t);
LOG.error("Unexpected problem while getting the file statistics for files '" + Arrays.toString(getFilePaths()) + "': "
+ t.getMessage(), t);
}
} finally {
// restore properties (even on return)
this.openTimeout = oldTimeout;
this.bufferSize = oldBufferSize;
this.lineLengthLimit = oldLineLengthLimit;
}


// no statistics possible
return null;
Expand Down
Loading

0 comments on commit 892eb85

Please sign in to comment.