Skip to content

Commit

Permalink
[FLINK-5415] Harden ContinuousFileProcessingTest
Browse files Browse the repository at this point in the history
- Use TemporaryFolder @ClassRule instead of manually managing HDFS base
dir.
- Place files for each test in own sub-directory
- Harden completeness condition in testFunctionRestore()
  • Loading branch information
aljoscha committed Feb 10, 2017
1 parent e29dfb8 commit f6709b4
Showing 1 changed file with 59 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileNotFoundException;
Expand All @@ -73,22 +74,20 @@ public class ContinuousFileProcessingTest {

private static final long INTERVAL = 100;

private static File baseDir;

private static FileSystem hdfs;
private static String hdfsURI;
private static MiniDFSCluster hdfsCluster;

// PREPARING FOR THE TESTS
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

@BeforeClass
public static void createHDFS() {
try {
baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
File hdfsDir = tempFolder.newFolder();

org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsDir.getAbsolutePath());
hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.

MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
Expand All @@ -106,17 +105,12 @@ public static void createHDFS() {
@AfterClass
public static void destroyHDFS() {
try {
FileUtil.fullyDelete(baseDir);
hdfsCluster.shutdown();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}

// END OF PREPARATIONS

// TESTS

@Test
public void testInvalidPathSpecification() throws Exception {

Expand Down Expand Up @@ -145,17 +139,19 @@ public void collect(TimestampedFileInputSplit element) {

@Test
public void testFileReadingOperatorWithIngestionTime() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
Map<String, Long> modTimes = new HashMap<>();
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated.add(file.f0);
modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
expectedFileContents.put(i, file.f1);
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);

final long watermarkInterval = 10;
Expand Down Expand Up @@ -303,17 +299,19 @@ public int compare(String o1, String o2) {

@Test
public void testFileReadingOperatorWithEventTime() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<String, Long> modTimes = new HashMap<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
filesCreated.add(file.f0);
expectedFileContents.put(i, file.f1);
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);

ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
Expand Down Expand Up @@ -396,6 +394,7 @@ public int compare(String o1, String o2) {

@Test
public void testReaderSnapshotRestore() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

TimestampedFileInputSplit split1 =
new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
Expand All @@ -412,7 +411,7 @@ public void testReaderSnapshotRestore() throws Exception {

final OneShotLatch latch = new OneShotLatch();

BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testBasePath));
TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);

ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
Expand All @@ -439,7 +438,7 @@ public void testReaderSnapshotRestore() throws Exception {
}

ContinuousFileReaderOperator<FileInputSplit> restoredReader = new ContinuousFileReaderOperator<>(
new BlockingFileInputFormat(latch, new Path(hdfsURI)));
new BlockingFileInputFormat(latch, new Path(testBasePath)));
restoredReader.setOutputType(typeInfo, new ExecutionConfig());

OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance =
Expand Down Expand Up @@ -543,24 +542,26 @@ public void close() {

@Test
public void testFilePathFiltering() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Set<String> filesKept = new TreeSet<>();

// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}

// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated.add(file.f0);
filesKept.add(file.f0.getName());
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(new FilePathFilter() {

private static final long serialVersionUID = 2611449927338589804L;
Expand Down Expand Up @@ -591,19 +592,21 @@ public boolean filterPath(Path filePath) {

@Test
public void testNestedFilesProcessing() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
final Set<String> filesToBeRead = new TreeSet<>();

// create two nested directories
org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir");
org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir" + "/" + "secondLevelDir");
Assert.assertFalse(hdfs.exists(firstLevelDir));
hdfs.mkdirs(firstLevelDir);
hdfs.mkdirs(secondLevelDir);

// create files in the base dir, the first level dir and the second level dir
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "firstLevelFile", i, "This is test line.");
filesCreated.add(file.f0);
filesToBeRead.add(file.f0.getName());
}
Expand All @@ -618,7 +621,7 @@ public void testNestedFilesProcessing() throws Exception {
filesToBeRead.add(file.f0.getName());
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
format.setNestedFileEnumeration(true);

Expand All @@ -644,19 +647,21 @@ public void testNestedFilesProcessing() throws Exception {

@Test
public void testSortingOnModTime() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

final long[] modTimes = new long[NO_OF_FILES];
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];

for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
Thread.sleep(400);

filesCreated[i] = file.f0;
modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());

// this is just to verify that all splits have been forwarded later.
Expand All @@ -680,18 +685,20 @@ public void testSortingOnModTime() throws Exception {

@Test
public void testProcessOnce() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

final OneShotLatch latch = new OneShotLatch();

// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));

// the source is supposed to read only this file.
final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
Expand Down Expand Up @@ -728,7 +735,7 @@ public void run() {
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = ignoredFile.f0;
}

Expand All @@ -746,16 +753,18 @@ public void run() {

@Test
public void testFunctionRestore() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";


org.apache.hadoop.fs.Path path = null;
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
path = file.f0;
fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
}

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
Expand All @@ -771,17 +780,19 @@ public void testFunctionRestore() throws Exception {

final OneShotLatch latch = new OneShotLatch();

final DummySourceContext sourceContext = new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
};

// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
});
monitoringFunction.run(sourceContext);
}
catch (Throwable t) {
t.printStackTrace();
Expand All @@ -791,10 +802,15 @@ public void collect(TimestampedFileInputSplit element) {
};
runner.start();

// first condition for the source to have updated its state: emit at least one element
if (!latch.isTriggered()) {
latch.await();
}

// second condition for the source to have updated its state: it's not on the lock anymore,
// this means it has processed all the splits and updated its state.
synchronized (sourceContext.getCheckpointLock()) {}

OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
monitoringFunction.cancel();
runner.join();
Expand All @@ -820,17 +836,19 @@ public void collect(TimestampedFileInputSplit element) {

@Test
public void testProcessContinuously() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

final OneShotLatch latch = new OneShotLatch();

// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));

final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());

TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
Expand Down Expand Up @@ -863,7 +881,7 @@ public void run() {
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = file.f0;
filesToBeRead.add(file.f0.getName());
}
Expand Down

0 comments on commit f6709b4

Please sign in to comment.