Skip to content

Commit

Permalink
[FLINK-5432] recursively scan nested files in ContinuousFileMonitorin…
Browse files Browse the repository at this point in the history
…gFunction

This closes apache#3090.
  • Loading branch information
Yassine Marzougui authored and zentol committed Jan 19, 2017
1 parent cbd933b commit 9945904
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {
* @param fileStatus The file status to check.
* @return true, if the given file or directory is accepted
*/
protected boolean acceptFile(FileStatus fileStatus) {
public boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_")
&& !name.startsWith(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,59 @@ public boolean filterPath(Path filePath) {
}
}

@Test
public void testNestedFilesProcessing() throws Exception {
final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
final Set<String> filesToBeRead = new TreeSet<>();

// create two nested directories
org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
Assert.assertFalse(hdfs.exists(firstLevelDir));
hdfs.mkdirs(firstLevelDir);
hdfs.mkdirs(secondLevelDir);

// create files in the base dir, the first level dir and the second level dir
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
filesCreated.add(file.f0);
filesToBeRead.add(file.f0.getName());
}
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(firstLevelDir.toString(), "secondLevelFile", i, "This is test line.");
filesCreated.add(file.f0);
filesToBeRead.add(file.f0.getName());
}
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(secondLevelDir.toString(), "thirdLevelFile", i, "This is test line.");
filesCreated.add(file.f0);
filesToBeRead.add(file.f0.getName());
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
format.setNestedFileEnumeration(true);

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);

monitoringFunction.open(new Configuration());
monitoringFunction.run(context);

Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());

// finally delete the dirs and the files created for the test.
for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
hdfs.delete(secondLevelDir, false);
hdfs.delete(firstLevelDir, false);
}

@Test
public void testSortingOnModTime() throws Exception {
final long[] modTimes = new long[NO_OF_FILES];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private void monitorDirAndForwardSplits(FileSystem fs,
SourceContext<TimestampedFileInputSplit> context) throws IOException {
assert (Thread.holdsLock(checkpointLock));

Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs);
Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);

for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
Expand Down Expand Up @@ -282,11 +282,11 @@ private Map<Long, List<TimestampedFileInputSplit>> getInputSplitsSortedByModTime
* Returns the paths of the files not yet processed.
* @param fileSystem The filesystem where the monitored directory resides.
*/
private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) throws IOException {

final FileStatus[] statuses;
try {
statuses = fileSystem.listStatus(new Path(path));
statuses = fileSystem.listStatus(path);
} catch (IOException e) {
// we may run into an IOException if files are moved while listing their status
// delay the check for eligible files in this case
Expand All @@ -300,10 +300,14 @@ private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IO
Map<Path, FileStatus> files = new HashMap<>();
// handle the new files
for (FileStatus status : statuses) {
Path filePath = status.getPath();
long modificationTime = status.getModificationTime();
if (!shouldIgnore(filePath, modificationTime)) {
files.put(filePath, status);
if (!status.isDir()) {
Path filePath = status.getPath();
long modificationTime = status.getModificationTime();
if (!shouldIgnore(filePath, modificationTime)) {
files.put(filePath, status);
}
} else if (format.getNestedFileEnumeration() && format.acceptFile(status)){
files.putAll(listEligibleFiles(fileSystem, status.getPath()));
}
}
return files;
Expand Down

0 comments on commit 9945904

Please sign in to comment.