From 892eb8557e426e3ac1f824baba68c952eb9fe485 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Mon, 5 Feb 2018 14:40:50 +0100 Subject: [PATCH] [FLINK-3655] [core] Add support for multiple file paths for FileInputFormat. - Reverted API-breaking changes. - Enable multi-path support for the following InputFormats: - AvroInputFormat, - [Pojo,Row,Tuple]CsvInputFormat, - OrcInputFormat, - TextInputFormat, - TextValueInputFormat --- .../apache/flink/orc/OrcRowInputFormat.java | 5 + .../api/common/io/BinaryInputFormat.java | 31 +- .../api/common/io/DelimitedInputFormat.java | 60 ++- .../flink/api/common/io/FileInputFormat.java | 280 +++++++----- .../api/common/io/GenericCsvInputFormat.java | 13 +- .../api/common/io/BinaryInputFormatTest.java | 159 +++---- .../common/io/DelimitedInputFormatTest.java | 99 ++--- .../api/common/io/FileInputFormatTest.java | 414 +++++++++--------- .../flink/formats/avro/AvroInputFormat.java | 5 + .../flink/api/java/io/CsvInputFormat.java | 3 +- .../flink/api/java/io/TextInputFormat.java | 8 +- .../api/java/io/TextValueInputFormat.java | 7 +- .../ContinuousFileMonitoringFunction.java | 6 +- ...ContinuousFileProcessingRescalingTest.java | 2 +- 14 files changed, 558 insertions(+), 534 deletions(-) diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java index a037962d0fe22..08c1164f3d814 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java @@ -393,6 +393,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } } + @Override + public boolean supportsMultiPaths() { + return true; + } + // -------------------------------------------------------------------------------------------- // Getter methods for tests // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 6f90d83cf6043..c2cb47f6b4200 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -111,13 +111,13 @@ public long getBlockSize() { @Override public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { - List files = this.getFiles(); - - final FileSystem fs = getFilePath().getFileSystem(); - final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize; + final List files = this.getFiles(); final List inputSplits = new ArrayList(minNumSplits); for (FileStatus file : files) { + final FileSystem fs = file.getPath().getFileSystem(); + final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ? fs.getDefaultBlockSize() : this.blockSize; + for (long pos = 0, length = file.getLen(); pos < length; pos += blockSize) { long remainingLength = Math.min(pos + blockSize, length) - pos; @@ -132,10 +132,10 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { if (inputSplits.size() < minNumSplits) { LOG.warn(String.format( - "With the given block size %d, the file %s cannot be split into %d blocks. Filling up with empty splits...", - blockSize, getFilePath(), minNumSplits)); + "With the given block size %d, the files %s cannot be split into %d blocks. Filling up with empty splits...", + blockSize, Arrays.toString(getFilePaths()), minNumSplits)); FileStatus last = files.get(files.size() - 1); - final BlockLocation[] blocks = fs.getFileBlockLocations(last, 0, last.getLen()); + final BlockLocation[] blocks = last.getPath().getFileSystem().getFileBlockLocations(last, 0, last.getLen()); for (int index = files.size(); index < minNumSplits; index++) { inputSplits.add(new FileInputSplit(index, last.getPath(), last.getLen(), 0, blocks[0].getHosts())); } @@ -146,9 +146,9 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { protected List getFiles() throws IOException { // get all the files that are involved in the splits - List files = new ArrayList(); + List files = new ArrayList<>(); - for (Path filePath: this.filePathList) { + for (Path filePath: getFilePaths()) { final FileSystem fs = filePath.getFileSystem(); final FileStatus pathFile = fs.getFileStatus(filePath); @@ -172,10 +172,10 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) { final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; - + try { final ArrayList allFiles = new ArrayList(1); - final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles); + final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles); if (stats == null) { return null; } @@ -187,19 +187,18 @@ public SequentialStatistics getStatistics(BaseStatistics cachedStats) { } catch (IOException ioex) { if (LOG.isWarnEnabled()) { LOG.warn( - String.format("Could not determine complete statistics for files in '%s' due to an I/O error", - this.filePathList), + String.format("Could not determine complete statistics for files '%s' due to an I/O error", + Arrays.toString(getFilePaths())), ioex); } } catch (Throwable t) { if (LOG.isErrorEnabled()) { LOG.error( - String.format("Unexpected problem while getting the file statistics for file in'%s'", - this.filePathList), + String.format("Unexpected problem while getting the file statistics for files '%s'", + Arrays.toString(getFilePaths())), t); } } - // no stats available return null; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index d7d51f6a719ae..c1ef344175b7f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -350,33 +350,28 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc final int oldLineLengthLimit = this.lineLengthLimit; try { - final ArrayList allFiles = new ArrayList(1); + final ArrayList allFiles = new ArrayList<>(1); - // let the file input format deal with the up-to-date check and the - // basic size - final FileBaseStatistics stats = getFileStats(cachedFileStats, this.filePathList, allFiles); + // let the file input format deal with the up-to-date check and the basic size + final FileBaseStatistics stats = getFileStats(cachedFileStats, getFilePaths(), allFiles); if (stats == null) { return null; } - - // check whether the width per record is already known or the total - // size is unknown as well + + // check whether the width per record is already known or the total size is unknown as well // in both cases, we return the stats as they are - if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN - || stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) { + if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN || + stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) { return stats; } - // disabling sampling for unsplittable files since the logic below - // assumes splitability. - // TODO: Add sampling for unsplittable files. Right now, only - // compressed text files are affected by this limitation. + // disabling sampling for unsplittable files since the logic below assumes splitability. + // TODO: Add sampling for unsplittable files. Right now, only compressed text files are affected by this limitation. if (unsplittable) { return stats; } - - // compute how many samples to take, depending on the defined upper - // and lower bound + + // compute how many samples to take, depending on the defined upper and lower bound final int numSamples; if (this.numLineSamples != NUM_SAMPLES_UNDEFINED) { numSamples = this.numLineSamples; @@ -385,7 +380,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc final int calcSamples = (int) (stats.getTotalInputSize() / 1024); numSamples = Math.min(DEFAULT_MAX_NUM_SAMPLES, Math.max(DEFAULT_MIN_NUM_SAMPLES, calcSamples)); } - + // check if sampling is disabled. if (numSamples == 0) { return stats; @@ -393,16 +388,15 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc if (numSamples < 0) { throw new RuntimeException("Error: Invalid number of samples: " + numSamples); } - - // make sure that the sampling times out after a while if the file - // system does not answer in time + + + // make sure that the sampling times out after a while if the file system does not answer in time this.openTimeout = 10000; // set a small read buffer size this.bufferSize = 4 * 1024; - // prevent overly large records, for example if we have an - // incorrectly configured delimiter + // prevent overly large records, for example if we have an incorrectly configured delimiter this.lineLengthLimit = MAX_SAMPLE_LEN; - + long offset = 0; long totalNumBytes = 0; long stepSize = stats.getTotalInputSize() / numSamples; @@ -436,20 +430,21 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc fileNum++; } } - + // we have the width, store it - return new FileBaseStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), - totalNumBytes / (float) samplesTaken); - + return new FileBaseStatistics(stats.getLastModificationTime(), + stats.getTotalInputSize(), totalNumBytes / (float) samplesTaken); + } catch (IOException ioex) { if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics for file(s) in '" + this.filePathList - + "' due to an io error: " + ioex.getMessage()); + LOG.warn("Could not determine statistics for files '" + Arrays.toString(getFilePaths()) + "' " + + "due to an io error: " + ioex.getMessage()); } - } catch (Throwable t) { + } + catch (Throwable t) { if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problen while getting the file statistics for file(s) in'" + this.filePathList - + "': " + t.getMessage(), t); + LOG.error("Unexpected problem while getting the file statistics for files '" + Arrays.toString(getFilePaths()) + "': " + + t.getMessage(), t); } } finally { // restore properties (even on return) @@ -457,7 +452,6 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc this.bufferSize = oldBufferSize; this.lineLengthLimit = oldLineLengthLimit; } - // no statistics possible return null; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 0c60881fc51d7..14cf647cd24b9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -194,8 +194,17 @@ protected static String extractFileExtension(String fileName) { /** * The path to the file that contains the input. + * + * @deprecated Please override {@link FileInputFormat#supportsMultiPaths()} and + * use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}. */ - protected List filePathList; + @Deprecated + protected Path filePath; + + /** + * The list of paths to files and directories that contain the input. + */ + private Path[] filePaths; /** * The minimal split size, set by the configure() method. @@ -245,22 +254,47 @@ protected FileInputFormat(Path filePath) { // Getters/setters for the configurable parameters // -------------------------------------------------------------------------------------------- + /** + * + * @return The path of the file to read. + * + * @deprecated Please use getFilePaths() instead. + */ + @Deprecated public Path getFilePath() { - if (filePathList == null || filePathList.size() < 1) { - return null; + + if (supportsMultiPaths()) { + if (this.filePaths == null || this.filePaths.length == 0) { + return null; + } else if (this.filePaths.length == 1) { + return this.filePaths[0]; + } else { + throw new UnsupportedOperationException( + "FileInputFormat is configured with multiple paths. Use getFilePaths() instead."); + } + } else { + return filePath; } - return filePathList.get(0); } /** + * Returns the paths of all files to be read by the FileInputFormat. * - * @return the list of all file paths + * @return The list of all paths to read. */ public Path[] getFilePaths() { - if (this.filePathList == null) { - return new Path[0]; + + if (supportsMultiPaths()) { + if (this.filePaths == null) { + return new Path[0]; + } + return this.filePaths; + } else { + if (this.filePath == null) { + return new Path[0]; + } + return new Path[] {filePath}; } - return this.filePathList.toArray(new Path[this.filePathList.size()]); } public void setFilePath(String filePath) { @@ -279,39 +313,60 @@ public void setFilePath(String filePath) { } try { - setFilePath(new Path(filePath)); + this.setFilePath(new Path(filePath)); } catch (RuntimeException rex) { throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); } } + /** + * Sets a single path of a file to be read. + * + * @param filePath The path of the file to read. + */ public void setFilePath(Path filePath) { if (filePath == null) { throw new IllegalArgumentException("File path must not be null."); } - this.filePathList = new ArrayList(); - this.filePathList.add(filePath); + setFilePaths(filePath); } /** + * Sets multiple paths of files to be read. * - * @param filePaths the paths to set + * @param filePaths The paths of the files to read. */ public void setFilePaths(String... filePaths) { + Path[] paths = new Path[filePaths.length]; + for (int i = 0; i < paths.length; i++) { + paths[i] = new Path(filePaths[i]); + } + setFilePaths(paths); + } + + /** + * Sets multiple paths of files to be read. + * + * @param filePaths The paths of the files to read. + */ + public void setFilePaths(Path... filePaths) { + if (!supportsMultiPaths() && filePaths.length > 1) { + throw new UnsupportedOperationException( + "Multiple paths are not supported by this FileInputFormat."); + } if (filePaths.length < 1) { - throw new IllegalArgumentException("At least one file path must be given."); + throw new IllegalArgumentException("At least one file path must be specified."); } - this.filePathList = new ArrayList(); - for (String filePath : filePaths) { - if (filePath == null) { - throw new IllegalArgumentException("The file path must not be null."); - } - // can only add non-empty paths - if (!filePath.isEmpty()) { - this.filePathList.add(new Path(filePath)); - } + if (filePaths.length == 1) { + // set for backwards compatibility + this.filePath = filePaths[0]; + } else { + // clear file path in case it had been set before + this.filePath = null; } + + this.filePaths = filePaths; } public long getMinSplitSize() { @@ -395,12 +450,14 @@ public void setFilesFilter(FilePathFilter filesFilter) { @Override public void configure(Configuration parameters) { - // the if() clauses are to prevent the configure() method from - // overwriting the values set by the setters - - if (filePathList == null || filePathList.isEmpty()) { + if (getFilePaths().length == 0) { + // file path was not specified yet. Try to set it from the parameters. String filePath = parameters.getString(FILE_PARAMETER_KEY, null); - setFilePath(filePath); + if (filePath == null) { + throw new IllegalArgumentException("File path was not specified in input format or configuration."); + } else { + setFilePath(filePath); + } } if (!this.enumerateNestedFiles) { @@ -415,21 +472,56 @@ public void configure(Configuration parameters) { */ @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? + (FileBaseStatistics) cachedStats : null; + + try { + return getFileStats(cachedFileStats, getFilePaths(), new ArrayList<>(getFilePaths().length)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics for paths '" + Arrays.toString(getFilePaths()) + "' due to an io error: " + + ioex.getMessage()); + } + } + catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problem while getting the file statistics for paths '" + Arrays.toString(getFilePaths()) + "': " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[] filePaths, ArrayList files) throws IOException { + + long totalLength = 0; + long latestModTime = 0; - final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) - ? (FileBaseStatistics) cachedStats : null; - final FileBaseStatistics statistics = getFileStats(cachedFileStats, this.filePathList, new ArrayList(1)); + for (Path path : filePaths) { + final FileSystem fs = FileSystem.get(path.toUri()); + final FileBaseStatistics stats = getFileStats(cachedStats, path, fs, files); - // final sanity check - for backward compatibility - return (statistics.fileSize == BaseStatistics.SIZE_UNKNOWN) ? null : statistics; + if (stats.getTotalInputSize() == BaseStatistics.SIZE_UNKNOWN) { + totalLength = BaseStatistics.SIZE_UNKNOWN; + } else if (totalLength != BaseStatistics.SIZE_UNKNOWN) { + totalLength += stats.getTotalInputSize(); + } + latestModTime = Math.max(latestModTime, stats.getLastModificationTime()); + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); } - /** - * @deprecated Should no longer be used because of filePathList and multiple paths - */ - @Deprecated - protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs, - ArrayList files) throws IOException { + protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs, ArrayList files) throws IOException { + // get the file info and check whether the cached statistics are still valid. final FileStatus file = fs.getFileStatus(filePath); long totalLength = 0; @@ -460,56 +552,6 @@ protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path f } return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); } - - protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, final List filePaths, - ArrayList files) { - - long totalLength = 0; - long latestModTime = 0; - for (Path filePath : filePaths) { - try { - final FileSystem fs = FileSystem.get(filePath.toUri()); - // get the file info and check whether the cached statistics are - // still valid. - final FileStatus file = fs.getFileStatus(filePath); - - // enumerate all files - if (file.isDir()) { - totalLength += addFilesInDir(file.getPath(), files, false); - } else { - files.add(file); - testForUnsplittable(file); - totalLength += file.getLen(); - } - - // check the modification time stamp - for (FileStatus f : files) { - latestModTime = Math.max(f.getModificationTime(), latestModTime); - } - - // check whether the cached statistics are still valid, if we - // have any - if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { - return cachedStats; - } - } catch (IOException ioex) { - if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics for file '" + filePath + "' due to an io error: " - + ioex.getMessage()); - } - } catch (Throwable t) { - if (LOG.isErrorEnabled()) { - LOG.error("Unexpected problem while getting the file statistics for file '" + filePath + "': " - + t.getMessage(), t); - } - } - } - // sanity check - if (totalLength <= 0) { - totalLength = BaseStatistics.SIZE_UNKNOWN; - } - return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); - } @Override public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits) { @@ -528,22 +570,6 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits */ @Override public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { - final List inputSplits = new ArrayList(minNumSplits); - for (Path file : filePathList) { - inputSplits.addAll(createInputSplits(file, minNumSplits)); - } - return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); - } - - /** - * Utility method to create split inputs given a filePath. - * - * @param filePath the path to traverse - * @param minNumSplits The minimum desired number of file splits - * @return a list of {@link FileInputSplit} - * @throws IOException thrown, if there is an exception - */ - private List createInputSplits(final Path filePath, int minNumSplits) throws IOException { if (minNumSplits < 1) { throw new IllegalArgumentException("Number of input splits has to be at least 1."); } @@ -554,24 +580,28 @@ private List createInputSplits(final Path filePath, int minNumSp final List inputSplits = new ArrayList(minNumSplits); // get all the files that are involved in the splits - List files = new ArrayList(); + List files = new ArrayList<>(); long totalLength = 0; - final FileSystem fs = filePath.getFileSystem(); - final FileStatus pathFile = fs.getFileStatus(filePath); + for (Path path : getFilePaths()) { + final FileSystem fs = path.getFileSystem(); + final FileStatus pathFile = fs.getFileStatus(path); - if (pathFile.isDir()) { - totalLength += addFilesInDir(filePath, files, true); - } else { - testForUnsplittable(pathFile); + if (pathFile.isDir()) { + totalLength += addFilesInDir(path, files, true); + } else { + testForUnsplittable(pathFile); - files.add(pathFile); - totalLength += pathFile.getLen(); + files.add(pathFile); + totalLength += pathFile.getLen(); + } } + // returns if unsplittable if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + final FileSystem fs = file.getPath().getFileSystem(); final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen()); Set hosts = new HashSet(); for(BlockLocation block : blocks) { @@ -585,7 +615,7 @@ private List createInputSplits(final Path filePath, int minNumSp hosts.toArray(new String[hosts.size()])); inputSplits.add(fis); } - return inputSplits; + return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); } @@ -595,6 +625,7 @@ private List createInputSplits(final Path filePath, int minNumSp int splitNum = 0; for (final FileStatus file : files) { + final FileSystem fs = file.getPath().getFileSystem(); final long len = file.getLen(); final long blockSize = file.getBlockSize(); @@ -660,7 +691,7 @@ private List createInputSplits(final Path filePath, int minNumSp } } - return inputSplits; + return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); } /** @@ -832,11 +863,23 @@ public void close() throws IOException { } } + /** + * Override this method to supports multiple paths. + * When this method will be removed, all FileInputFormats have to support multiple paths. + * + * @return True if the FileInputFormat supports multiple paths, false otherwise. + * + * @deprecated Will be removed for Flink 2.0. + */ + @Deprecated + public boolean supportsMultiPaths() { + return false; + } public String toString() { - return this.filePathList == null ? + return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + this.filePathList + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; } // ============================================================================================ @@ -1034,7 +1077,4 @@ private void abortWait() { * The config parameter which defines whether input directories are recursively traversed. */ public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration"; - - public static final String FILE_PARAMETER_DELIMITER_KEY = "input.file.path.delimiter"; - public static final String FILE_PARAMETER_DELIMITER = ","; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index a446c27c9b1f2..3b65a42e946cf 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -106,6 +106,11 @@ protected GenericCsvInputFormat(Path filePath) { super(filePath, null); } + @Override + public boolean supportsMultiPaths() { + return true; + } + // -------------------------------------------------------------------------------------------- public int getNumberOfFieldsTotal() { @@ -336,13 +341,13 @@ public void open(FileInputSplit split) throws IOException { public void close() throws IOException { if (this.invalidLineCount > 0) { if (LOG.isWarnEnabled()) { - LOG.warn("In file \""+ getFilePath() + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); + LOG.warn("In file \"" + currentSplit.getPath() + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); } } if (this.commentCount > 0) { if (LOG.isInfoEnabled()) { - LOG.info("In file \""+ getFilePath() + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); + LOG.info("In file \"" + currentSplit.getPath() + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); } } super.close(); @@ -384,7 +389,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n" + "ParserError " + parser.getErrorState() + " \n" + "Expect field types: "+fieldTypesToString() + " \n" - + "in file: " + getFilePath()); + + "in file: " + currentSplit.getPath()); } } else if (startPos == limit @@ -408,7 +413,7 @@ else if (startPos == limit String lineAsString = new String(bytes, offset, numBytes, getCharset()); throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n" + "Expect field types: "+fieldTypesToString()+" \n" - + "in file: "+getFilePath()); + + "in file: " + currentSplit.getPath()); } else { return false; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java index 1bb04f5beaca4..a28c1e6295f1e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java @@ -41,8 +41,12 @@ private static final class MyBinaryInputFormat extends BinaryInputFormat protected Record deserialize(Record record, DataInputView dataInput) { return record; } - } + @Override + public boolean supportsMultiPaths() { + return true; + } + } @Test public void testCreateInputSplitsWithOneFile() throws IOException { @@ -75,119 +79,82 @@ public void testCreateInputSplitsWithOneFile() throws IOException { Assert.assertEquals("3. split has block size length.", blockSize, inputSplits[2].getLength()); } - private File createBinaryInputFile(String fileName, int blockSize, int numBlocks) throws IOException { - // create temporary file with 3 blocks - final File tempFile = File.createTempFile(fileName, "tmp"); - tempFile.deleteOnExit(); - FileOutputStream fileOutputStream = new FileOutputStream(tempFile); - try { - for (int i = 0; i < blockSize * numBlocks; i++) { - fileOutputStream.write(new byte[] { 1 }); - } - } finally { - fileOutputStream.close(); - } - return tempFile; - } - @Test public void testCreateInputSplitsWithMulitpleFiles() throws IOException { final int blockInfoSize = new BlockInfo().getInfoSize(); final int blockSize = blockInfoSize + 8; - final int BLOCKS1 = 3; - final int BLOCKS2 = 5; + final int numBlocks1 = 3; + final int numBlocks2 = 5; - final File tempFile = createBinaryInputFile("binary_input_format_test", blockSize, BLOCKS1); - final File tempFile2 = createBinaryInputFile("binary_input_format_test_2", blockSize, BLOCKS2); - - final Configuration config = new Configuration(); - config.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); + final File tempFile1 = createBinaryInputFile("binary_input_format_test", blockSize, numBlocks1); + final File tempFile2 = createBinaryInputFile("binary_input_format_test_2", blockSize, numBlocks2); + final String pathFile1 = tempFile1.toURI().toString(); + final String pathFile2 = tempFile2.toURI().toString(); final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); - inputFormat.setFilePaths(tempFile.toURI().toString(), tempFile2.toURI().toString()); + inputFormat.setFilePaths(pathFile1, pathFile2); + inputFormat.setBlockSize(blockSize); - inputFormat.configure(config); + final int numBlocksTotal = numBlocks1 + numBlocks2; + FileInputSplit[] inputSplits = inputFormat.createInputSplits(numBlocksTotal); - final int TOTAL_BLOCKS = BLOCKS1 + BLOCKS2; - FileInputSplit[] inputSplits = inputFormat.createInputSplits(TOTAL_BLOCKS); + int numSplitsFile1 = 0; + int numSplitsFile2 = 0; - Assert.assertEquals("Returns requested numbers of splits.", TOTAL_BLOCKS, inputSplits.length); - for (int index = 0; index < inputSplits.length; index++) { - Assert.assertEquals(String.format("%d. split has block size length.", index), - blockSize, inputSplits[index].getLength()); - } - } - @Test - public void testGetStatisticsNonExistingFile() { - try { - final MyBinaryInputFormat format = new MyBinaryInputFormat(); - format.setFilePath("file:///some/none/existing/directory/"); - format.configure(new Configuration()); - - BaseStatistics stats = format.getStatistics(null); - Assert.assertNull("The file statistics should be null.", stats); - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail(ex.getMessage()); + Assert.assertEquals("Returns requested numbers of splits.", numBlocksTotal, inputSplits.length); + for (int i = 0; i < inputSplits.length; i++) { + Assert.assertEquals(String.format("%d. split has block size length.", i), blockSize, inputSplits[i].getLength()); + if (inputSplits[i].getPath().toString().equals(pathFile1)) { + numSplitsFile1++; + } else if (inputSplits[i].getPath().toString().equals(pathFile2)) { + numSplitsFile2++; + } else { + Assert.fail("Split does not belong to any input file."); + } } + Assert.assertEquals(numBlocks1, numSplitsFile1); + Assert.assertEquals(numBlocks2, numSplitsFile2); } - + @Test - public void testGetStatisticsMultipleNonExistingFile() { - try { - final MyBinaryInputFormat format = new MyBinaryInputFormat(); - format.setFilePaths("file:///some/none/existing/directory/", "file:///another/none/existing/directory/"); - format.configure(new Configuration()); - - BaseStatistics stats = format.getStatistics(null); - Assert.assertNull("The file statistics should be null.", stats); - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail(ex.getMessage()); - } + public void testGetStatisticsNonExistingFiles() { + final MyBinaryInputFormat format = new MyBinaryInputFormat(); + format.setFilePaths("file:///some/none/existing/directory/", "file:///another/none/existing/directory/"); + format.configure(new Configuration()); + + BaseStatistics stats = format.getStatistics(null); + Assert.assertNull("The file statistics should be null.", stats); } @Test - public void testGetStatisticsSinglePaths() { - try { - final int blockInfoSize = new BlockInfo().getInfoSize(); - final int blockSize = blockInfoSize + 8; - final int BLOCKS = 3; - - final File tempFile = createBinaryInputFile("binary_input_format_test", blockSize, BLOCKS); - final Configuration config = new Configuration(); - config.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); - - final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); - inputFormat.setFilePath(tempFile.toURI().toString()); - BaseStatistics stats = inputFormat.getStatistics(null); - Assert.assertEquals("The file size statistics is wrong", blockSize * BLOCKS, stats.getTotalInputSize()); - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail(ex.getMessage()); - } + public void testGetStatisticsMultiplePaths() throws IOException { + final int blockInfoSize = new BlockInfo().getInfoSize(); + final int blockSize = blockInfoSize + 8; + final int numBlocks1 = 3; + final int numBlocks2 = 5; + + final File tempFile = createBinaryInputFile("binary_input_format_test", blockSize, numBlocks1); + final File tempFile2 = createBinaryInputFile("binary_input_format_test_2", blockSize, numBlocks2); + + final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); + inputFormat.setFilePaths(tempFile.toURI().toString(), tempFile2.toURI().toString()); + inputFormat.setBlockSize(blockSize); + + BaseStatistics stats = inputFormat.getStatistics(null); + Assert.assertEquals("The file size statistics is wrong", blockSize * (numBlocks1 + numBlocks2), stats.getTotalInputSize()); } - @Test - public void testGetStatisticsMultiplePaths() { - try { - final int blockInfoSize = new BlockInfo().getInfoSize(); - final int blockSize = blockInfoSize + 8; - final int BLOCKS = 3; - final int BLOCKS2 = 5; - - final File tempFile = createBinaryInputFile("binary_input_format_test", blockSize, BLOCKS); - final File tempFile2 = createBinaryInputFile("binary_input_format_test_2", blockSize, BLOCKS2); - final Configuration config = new Configuration(); - config.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize); - - final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); - inputFormat.setFilePaths(tempFile.toURI().toString(), tempFile2.toURI().toString()); - BaseStatistics stats = inputFormat.getStatistics(null); - Assert.assertEquals("The file size statistics is wrong", blockSize * (BLOCKS + BLOCKS2), stats.getTotalInputSize()); - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail(ex.getMessage()); + /** + * Creates a temp file with a certain number of blocks of a certain size. + */ + private File createBinaryInputFile(String fileName, int blockSize, int numBlocks) throws IOException { + final File tempFile = File.createTempFile(fileName, "tmp"); + tempFile.deleteOnExit(); + try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) { + for (int i = 0; i < blockSize * numBlocks; i++) { + fileOutputStream.write(new byte[]{1}); + } } + return tempFile; } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index eeeade7edc98a..b4b4512e5e3a0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.io; import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; @@ -431,37 +432,42 @@ public void testDelimiterOnBufferBoundary() throws IOException { } // -- Statistics --// + @Test - public void testGetStatisticsSingleFileFileDoesNotExist() throws IOException { + public void testGetStatistics() throws IOException { + final String myString = "my mocked line 1\nmy mocked line 2\n"; + final long size = myString.length(); + final Path filePath = createTempFilePath(myString); + + final String myString2 = "my mocked line 1\nmy mocked line 2\nanother mocked line3\n"; + final long size2 = myString2.length(); + final Path filePath2 = createTempFilePath(myString2); + + final long totalSize = size + size2; + DelimitedInputFormat format = new MyTextInputFormat(); - format.setFilePath(new Path("file://path/does/not/really/exist")); - - FileBaseStatistics stats = format.getStatistics(null); - assertNotNull("The file statistics should not be null", stats); - - assertEquals("The file size from the statistics is wrong.", FileBaseStatistics.SIZE_UNKNOWN, stats.getTotalInputSize()); + format.setFilePaths(filePath.toUri().toString(), filePath2.toUri().toString()); + + FileInputFormat.FileBaseStatistics stats = format.getStatistics(null); + assertNotNull(stats); + assertEquals("The file size from the statistics is wrong.", totalSize, stats.getTotalInputSize()); } @Test - public void testGetStatisticsSingleFile() throws IOException { - final String myString = "my mocked line 1\nmy mocked line 2\n"; - final long SIZE = myString.length(); - final Path filePath = createTempFilePath(myString); - + public void testGetStatisticsFileDoesNotExist() throws IOException { DelimitedInputFormat format = new MyTextInputFormat(); - format.setFilePath(filePath); - + format.setFilePaths("file:///path/does/not/really/exist", "file:///another/path/that/does/not/exist"); + FileBaseStatistics stats = format.getStatistics(null); - assertNotNull(stats); - assertEquals("The file size from the statistics is wrong", SIZE, stats.getTotalInputSize()); + assertNull("The file statistics should be null.", stats); } - + @Test public void testGetStatisticsSingleFileWithCachedVersion() throws IOException { final String myString = "my mocked line 1\nmy mocked line 2\n"; final Path tempFile = createTempFilePath(myString); - final long SIZE = myString.length(); - final long FAKE_SIZE = 10065; + final long size = myString.length(); + final long fakeSize = 10065; DelimitedInputFormat format = new MyTextInputFormat(); format.setFilePath(tempFile); @@ -469,65 +475,34 @@ public void testGetStatisticsSingleFileWithCachedVersion() throws IOException { FileBaseStatistics stats = format.getStatistics(null); assertNotNull(stats); - assertEquals("The file size from the statistics is wrong", SIZE, stats.getTotalInputSize()); + assertEquals("The file size from the statistics is wrong.", size, stats.getTotalInputSize()); format = new MyTextInputFormat(); format.setFilePath(tempFile); format.configure(new Configuration()); FileBaseStatistics newStats = format.getStatistics(stats); - assertEquals("Statistics object was changed", newStats, stats); + assertEquals("Statistics object was changed.", newStats, stats); // insert fake stats with the correct modification time. the call should return the fake stats format = new MyTextInputFormat(); format.setFilePath(tempFile); format.configure(new Configuration()); - FileBaseStatistics fakeStats = new FileBaseStatistics(stats.getLastModificationTime(), FAKE_SIZE, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + FileBaseStatistics fakeStats = new FileBaseStatistics(stats.getLastModificationTime(), fakeSize, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); BaseStatistics latest = format.getStatistics(fakeStats); - assertEquals("The file size from the statistics is wrong.", FAKE_SIZE, latest.getTotalInputSize()); + assertEquals("The file size from the statistics is wrong.", fakeSize, latest.getTotalInputSize()); // insert fake stats with the expired modification time. the call should return new accurate stats format = new MyTextInputFormat(); format.setFilePath(tempFile); format.configure(new Configuration()); - FileBaseStatistics outDatedFakeStats = new FileBaseStatistics(stats.getLastModificationTime()-1, FAKE_SIZE, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + FileBaseStatistics outDatedFakeStats = new FileBaseStatistics(stats.getLastModificationTime() - 1, fakeSize, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); BaseStatistics reGathered = format.getStatistics(outDatedFakeStats); - assertEquals("The file size from the statistics is wrong.", SIZE, reGathered.getTotalInputSize()); - } - - @Test - public void testGetStatisticsMultipleFileDoesNotExist() throws IOException { - DelimitedInputFormat format = new MyTextInputFormat(); - format.setFilePaths("file://path/does/not/really/exist","file://another/path/that/does/not/exist"); - - FileBaseStatistics stats = format.getStatistics(null); - assertNotNull("The file statistics should not be null", stats); - - assertEquals("The file size from the statistics is wrong.", FileBaseStatistics.SIZE_UNKNOWN, stats.getTotalInputSize()); + assertEquals("The file size from the statistics is wrong.", size, reGathered.getTotalInputSize()); } - @Test - public void testGetStatisticsMultipleFiles() throws IOException { - final String myString = "my mocked line 1\nmy mocked line 2\n"; - final long SIZE = myString.length(); - final Path filePath = createTempFilePath(myString); - - final String myString2 = "my mocked line 1\nmy mocked line 2\nanother mocked line3\n"; - final long SIZE2 = myString2.length(); - final Path filePath2 = createTempFilePath(myString2); - - final long TOTAL_SIZE = SIZE + SIZE2; - - DelimitedInputFormat format = new MyTextInputFormat(); - format.setFilePaths(filePath.toUri().toString(), filePath2.toUri().toString()); - - FileBaseStatistics stats = format.getStatistics(null); - assertNotNull(stats); - assertEquals("The file size from the statistics is wrong", TOTAL_SIZE, stats.getTotalInputSize()); - } - static FileInputSplit createTempFile(String contents) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); @@ -557,17 +532,19 @@ protected static final class MyTextInputFormat extends DelimitedInputFormat { private static final long serialVersionUID = 1L; - + @Override public boolean reachedEnd() throws IOException { return true; @@ -757,6 +764,15 @@ public IntValue nextRecord(IntValue record) throws IOException { } } + private class MultiDummyFileInputFormat extends DummyFileInputFormat { + private static final long serialVersionUID = 1L; + + @Override + public boolean supportsMultiPaths() { + return true; + } + } + private static final class MyDecoratedInputFormat extends FileInputFormat { private static final long serialVersionUID = 1L; diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java index 9b73ceb1cd245..8e12871fcd7d8 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java @@ -172,6 +172,11 @@ public E nextRecord(E reuseValue) throws IOException { } } + @Override + public boolean supportsMultiPaths() { + return true; + } + // -------------------------------------------------------------------------------------------- // Checkpointing // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index 0bd4e693c5f94..046181cef8b21 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -27,6 +27,7 @@ import org.apache.flink.util.StringUtils; import java.io.IOException; +import java.util.Arrays; /** * InputFormat that reads csv files. @@ -157,7 +158,7 @@ protected static boolean[] toBooleanMask(int[] sourceFieldIndices) { @Override public String toString() { - return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath(); + return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + Arrays.toString(getFilePaths()); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java index 7d4cf9c42025c..82793adc137be 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; /** * Input Format that reads text files. Each line results in another element. @@ -98,6 +99,11 @@ public String readRecord(String reusable, byte[] bytes, int offset, int numBytes @Override public String toString() { - return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName; + return "TextInputFormat (" + Arrays.toString(getFilePaths()) + ") - " + this.charsetName; + } + + @Override + public boolean supportsMultiPaths() { + return true; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java index 4721439fe7fdb..6406dd0c63e32 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java @@ -134,6 +134,11 @@ public StringValue readRecord(StringValue reuse, byte[] bytes, int offset, int n @Override public String toString() { - return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : ""); + return "TextValueInputFormat (" + Arrays.toString(getFilePaths()) + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : ""); + } + + @Override + public boolean supportsMultiPaths() { + return true; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 9f26efcdd5155..0006e0faf69d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -114,8 +114,12 @@ public ContinuousFileMonitoringFunction( "allowed one (" + MIN_MONITORING_INTERVAL + " ms)." ); + Preconditions.checkArgument( + format.getFilePaths().length == 1, + "FileInputFormats with multiple paths are not supported yet."); + this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format."); - this.path = Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified Path."); + this.path = Preconditions.checkNotNull(format.getFilePaths()[0].toString(), "Unspecified Path."); this.interval = interval; this.watchType = watchType; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java index 2d0855a1d881d..ca49a2a671431 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -274,7 +274,7 @@ private static class BlockingFileInputFormat public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { FileInputSplit[] splits = new FileInputSplit[minNumSplits]; for (int i = 0; i < minNumSplits; i++) { - splits[i] = new FileInputSplit(i, getFilePath(), i * linesPerSplit + 1, linesPerSplit, null); + splits[i] = new FileInputSplit(i, getFilePaths()[0], i * linesPerSplit + 1, linesPerSplit, null); } return splits; }