Skip to content

Commit

Permalink
[FLINK-9758] Fix ContinuousFileProcessingTest failure due to not sett…
Browse files Browse the repository at this point in the history
…ing runtimeContext

This closes apache#6260
  • Loading branch information
Myasuka authored and dawidwys committed Jul 15, 2018
1 parent 10ddfca commit a7be2e1
Showing 1 changed file with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.hdfstests;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -579,8 +581,7 @@ public boolean filterPath(Path filePath) {
});

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);

final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
Expand Down Expand Up @@ -632,8 +633,7 @@ public void testNestedFilesProcessing() throws Exception {
format.setNestedFileEnumeration(true);

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);

final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
Expand Down Expand Up @@ -674,8 +674,7 @@ public void testSortingOnModTime() throws Exception {
FileInputSplit[] splits = format.createInputSplits(1);

ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);

ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);

Expand Down Expand Up @@ -708,8 +707,7 @@ public void testProcessOnce() throws Exception {
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);

final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);

Expand Down Expand Up @@ -772,7 +770,7 @@ public void testFunctionRestore() throws Exception {
TextInputFormat format = new TextInputFormat(new Path(testBasePath));

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);

StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
Expand Down Expand Up @@ -823,7 +821,7 @@ public void run() {
testHarness.close();

final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);

StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
new StreamSource<>(monitoringFunctionCopy);
Expand Down Expand Up @@ -857,8 +855,7 @@ public void testProcessContinuously() throws Exception {
format.setFilesFilter(FilePathFilter.createDefaultFilter());

final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);

final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
Expand Down Expand Up @@ -1055,4 +1052,14 @@ private static Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithDa
Assert.assertTrue("No result file present", hdfs.exists(file));
return new Tuple2<>(file, str.toString());
}

/**
* Create continuous monitoring function with 1 reader-parallelism and interval: {@link #INTERVAL}.
*/
private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) {
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL);
monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));
return monitoringFunction;
}
}

0 comments on commit a7be2e1

Please sign in to comment.