Skip to content

Commit

Permalink
[FLINK-5612] [code] Make GlobPathFilter serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
mushketyk authored and StephanEwen committed Jan 27, 2017
1 parent fc597f6 commit ef96054
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
totalLength += pathFile.getLen();
}
// returns if unsplittable
if(unsplittable) {
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;

import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter {

private static final long serialVersionUID = 1L;

private final ArrayList<PathMatcher> includeMatchers;
private final ArrayList<PathMatcher> excludeMatchers;
private final List<String> includePatterns;
private final List<String> excludePatterns;

// Path matchers are not serializable so we are delaying their
// creation until they are used
private transient ArrayList<PathMatcher> includeMatchers;
private transient ArrayList<PathMatcher> excludeMatchers;

/**
* Constructor for GlobFilePathFilter that will match all files
Expand All @@ -69,8 +75,8 @@ public GlobFilePathFilter() {
* @param excludePatterns glob patterns for files to exclude
*/
public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
includeMatchers = buildPatterns(includePatterns);
excludeMatchers = buildPatterns(excludePatterns);
this.includePatterns = Preconditions.checkNotNull(includePatterns);
this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
}

private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
Expand All @@ -86,7 +92,7 @@ private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {

@Override
public boolean filterPath(Path filePath) {
if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) {
return false;
}

Expand All @@ -97,7 +103,7 @@ public boolean filterPath(Path filePath) {

final java.nio.file.Path nioPath = Paths.get(path);

for (PathMatcher matcher : includeMatchers) {
for (PathMatcher matcher : getIncludeMatchers()) {
if (matcher.matches(nioPath)) {
return shouldExclude(nioPath);
}
Expand All @@ -106,12 +112,27 @@ public boolean filterPath(Path filePath) {
return true;
}

private ArrayList<PathMatcher> getIncludeMatchers() {
if (includeMatchers == null) {
includeMatchers = buildPatterns(includePatterns);
}
return includeMatchers;
}

private ArrayList<PathMatcher> getExcludeMatchers() {
if (excludeMatchers == null) {
excludeMatchers = buildPatterns(excludePatterns);
}
return excludeMatchers;
}

private boolean shouldExclude(java.nio.file.Path nioPath) {
for (PathMatcher matcher : excludeMatchers) {
for (PathMatcher matcher : getExcludeMatchers()) {
if (matcher.matches(nioPath)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
package org.apache.flink.api.common.io;

import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class GlobFilePathFilterTest {
@Test
public void defaultConstructorCreateMatchAllFilter() {
public void testDefaultConstructorCreateMatchAllFilter() {
GlobFilePathFilter matcher = new GlobFilePathFilter();
assertFalse(matcher.filterPath(new Path("dir/file.txt")));
}

@Test
public void matchAllFilesByDefault() {
public void testMatchAllFilesByDefault() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.<String>emptyList(),
Collections.<String>emptyList());
Expand All @@ -42,7 +44,7 @@ public void matchAllFilesByDefault() {
}

@Test
public void excludeFilesNotInIncludePatterns() {
public void testExcludeFilesNotInIncludePatterns() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.<String>emptyList());
Expand All @@ -52,7 +54,7 @@ public void excludeFilesNotInIncludePatterns() {
}

@Test
public void excludeFilesIfMatchesExclude() {
public void testExcludeFilesIfMatchesExclude() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.singletonList("dir/file.txt"));
Expand All @@ -61,7 +63,7 @@ public void excludeFilesIfMatchesExclude() {
}

@Test
public void includeFileWithAnyCharacterMatcher() {
public void testIncludeFileWithAnyCharacterMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/?.txt"),
Collections.<String>emptyList());
Expand All @@ -71,7 +73,7 @@ public void includeFileWithAnyCharacterMatcher() {
}

@Test
public void includeFileWithCharacterSetMatcher() {
public void testIncludeFileWithCharacterSetMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[acd].txt"),
Collections.<String>emptyList());
Expand All @@ -83,7 +85,7 @@ public void includeFileWithCharacterSetMatcher() {
}

@Test
public void includeFileWithCharacterRangeMatcher() {
public void testIncludeFileWithCharacterRangeMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[a-d].txt"),
Collections.<String>emptyList());
Expand All @@ -96,7 +98,7 @@ public void includeFileWithCharacterRangeMatcher() {
}

@Test
public void excludeHDFSFile() {
public void testExcludeHDFSFile() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("/dir/file2.txt"));
Expand All @@ -107,7 +109,7 @@ public void excludeHDFSFile() {
}

@Test
public void excludeFilenameWithStart() {
public void testExcludeFilenameWithStart() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("\\*"));
Expand All @@ -118,7 +120,7 @@ public void excludeFilenameWithStart() {
}

@Test
public void singleStarPattern() {
public void testSingleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("*"),
Collections.<String>emptyList());
Expand All @@ -129,7 +131,7 @@ public void singleStarPattern() {
}

@Test
public void doubleStarPattern() {
public void testDoubleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.<String>emptyList());
Expand All @@ -138,4 +140,30 @@ public void doubleStarPattern() {
assertFalse(matcher.filterPath(new Path("a/b")));
assertFalse(matcher.filterPath(new Path("a/b/c")));
}

@Test(expected = NullPointerException.class)
public void testIncluePatternIsNull() {
new GlobFilePathFilter(
null,
Collections.<String>emptyList());
}

@Test(expected = NullPointerException.class)
public void testExcludePatternIsNull() {
new GlobFilePathFilter(
Collections.singletonList("**"),
null);
}

@Test
public void testGlobFilterSerializable() throws IOException {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.<String>emptyList());

GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher);
assertFalse(matcher.filterPath(new Path("a")));
assertFalse(matcher.filterPath(new Path("a/b")));
assertFalse(matcher.filterPath(new Path("a/b/c")));
}
}

0 comments on commit ef96054

Please sign in to comment.