Skip to content

Commit

Permalink
[FLINK-16056][runtime][tests] do fail ContinuousFileProcessingITCase …
Browse files Browse the repository at this point in the history
…on failure
  • Loading branch information
rkhachatryan authored and pnowojski committed Feb 18, 2020
1 parent 91fcd83 commit d3cb7ed
Showing 1 changed file with 14 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.test.util.AbstractTestBase;

import org.apache.flink.util.ExceptionUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
Expand All @@ -53,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -140,29 +142,19 @@ public void testProgram() throws Exception {
TestingSinkFunction sink = new TestingSinkFunction();
content.addSink(sink).setParallelism(1);

Thread job = new Thread() {

@Override
public void run() {
try {
env.execute("ContinuousFileProcessingITCase Job.");
} catch (Exception e) {
Throwable th = e;
for (int depth = 0; depth < 20; depth++) {
if (th instanceof SuccessException) {
return;
} else if (th.getCause() != null) {
th = th.getCause();
} else {
break;
}
}
e.printStackTrace();
Assert.fail(e.getMessage());
CompletableFuture<Void> jobFuture = new CompletableFuture<>();
new Thread(() -> {
try {
env.execute("ContinuousFileProcessingITCase Job.");
jobFuture.complete(null);
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, SuccessException.class).isPresent()) {
jobFuture.complete(null);
} else {
jobFuture.completeExceptionally(e);
}
}
};
job.start();
}).start();

// The modification time of the last created file.
long lastCreatedModTime = Long.MIN_VALUE;
Expand Down Expand Up @@ -197,8 +189,7 @@ public void run() {
Assert.assertTrue(hdfs.exists(file));
}

// wait for the job to finish.
job.join();
jobFuture.get();
}

private static class TestingSinkFunction extends RichSinkFunction<String> {
Expand Down

0 comments on commit d3cb7ed

Please sign in to comment.