Skip to content

Commit

Permalink
[hotfix][refactor] Cleanup TextInputFormatTest
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 16, 2019
1 parent bd712b7 commit 6613332
Showing 1 changed file with 85 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,63 @@
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests for {@link TextInputFormat}.
*/
public class TextInputFormatTest {
public class TextInputFormatTest extends TestLogger {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testSimpleRead() {
public void testSimpleRead() throws IOException {
final String first = "First line";
final String second = "Second line";

try {
// create input file
File tempFile = File.createTempFile("TextInputFormatTest", "tmp", temporaryFolder.getRoot());
tempFile.setWritable(true);
// create input file
File tempFile = File.createTempFile("TextInputFormatTest", "tmp", temporaryFolder.getRoot());
tempFile.setWritable(true);

PrintStream ps = new PrintStream(tempFile);
try (PrintStream ps = new PrintStream(tempFile)) {
ps.println(first);
ps.println(second);
ps.close();

TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));

Configuration parameters = new Configuration();
inputFormat.configure(parameters);
}

FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertTrue("expected at least one input split", splits.length >= 1);
TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));

inputFormat.open(splits[0]);
Configuration parameters = new Configuration();
inputFormat.configure(parameters);

String result = "";
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertThat("expected at least one input split", splits.length, greaterThanOrEqualTo(1));

inputFormat.open(splits[0]);
try {
assertFalse(inputFormat.reachedEnd());
result = inputFormat.nextRecord("");
String result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
assertEquals(first, result);

Expand All @@ -88,68 +88,58 @@ public void testSimpleRead() {
assertEquals(second, result);

assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
}
catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
t.printStackTrace(System.err);
fail("Test erroneous");
} finally {
inputFormat.close();
}
}

@Test
public void testNestedFileRead() {
public void testNestedFileRead() throws IOException {
String[] dirs = new String[] {"first", "second"};
List<String> expectedFiles = new ArrayList<>();

try {
File parentDir = temporaryFolder.getRoot();
for (String dir: dirs) {
// create input file
File tmpDir = temporaryFolder.newFolder(dir);

File tempFile = File.createTempFile("TextInputFormatTest", ".tmp", tmpDir);

expectedFiles.add(new Path(tempFile.getAbsolutePath()).makeQualified(FileSystem.getLocalFileSystem()).toString());
}

TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI()));
inputFormat.setNestedFileEnumeration(true);
inputFormat.setNumLineSamples(10);

// this is to check if the setter overrides the configuration (as expected)
Configuration config = new Configuration();
config.setBoolean("recursive.file.enumeration", false);
config.setString("delimited-format.numSamples", "20");
inputFormat.configure(config);

assertTrue(inputFormat.getNestedFileEnumeration());
assertTrue(inputFormat.getNumLineSamples() == 10);

FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size());

List<String> paths = new ArrayList<>();
for (FileInputSplit split: splits) {
paths.add(split.getPath().toString());
}

Collections.sort(expectedFiles);
Collections.sort(paths);
for (int i = 0; i < expectedFiles.size(); i++) {
assertTrue(expectedFiles.get(i).equals(paths.get(i)));
}

} catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
t.printStackTrace(System.err);
fail("Test erroneous");
File parentDir = temporaryFolder.getRoot();
for (String dir: dirs) {
// create input file
File tmpDir = temporaryFolder.newFolder(dir);

File tempFile = File.createTempFile("TextInputFormatTest", ".tmp", tmpDir);

expectedFiles.add(new Path(tempFile.getAbsolutePath()).makeQualified(FileSystem.getLocalFileSystem()).toString());
}

TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI()));
inputFormat.setNestedFileEnumeration(true);
inputFormat.setNumLineSamples(10);

// this is to check if the setter overrides the configuration (as expected)
Configuration config = new Configuration();
config.setBoolean("recursive.file.enumeration", false);
config.setString("delimited-format.numSamples", "20");
inputFormat.configure(config);

assertTrue(inputFormat.getNestedFileEnumeration());
assertEquals(10, inputFormat.getNumLineSamples());

FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size());

List<String> paths = new ArrayList<>();
for (FileInputSplit split: splits) {
paths.add(split.getPath().toString());
}

Collections.sort(expectedFiles);
Collections.sort(paths);
for (int i = 0; i < expectedFiles.size(); i++) {
assertEquals(expectedFiles.get(i), paths.get(i));
}
}

/**
* This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed.
*/
@Test
public void testRemovingTrailingCR() {
public void testRemovingTrailingCR() throws IOException {

testRemovingTrailingCR("\n", "\n");
testRemovingTrailingCR("\r\n", "\n");
Expand All @@ -158,58 +148,50 @@ public void testRemovingTrailingCR() {
testRemovingTrailingCR("|", "\n");
}

private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
private void testRemovingTrailingCR(String lineBreaker, String delimiter) throws IOException {
String first = "First line";
String second = "Second line";
String content = first + lineBreaker + second + lineBreaker;

try {
// create input file
File tempFile = File.createTempFile("TextInputFormatTest", "tmp", temporaryFolder.getRoot());
tempFile.setWritable(true);
// create input file
File tempFile = File.createTempFile("TextInputFormatTest", "tmp", temporaryFolder.getRoot());
tempFile.setWritable(true);

OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
try (OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile))) {
wrt.write(content);
wrt.close();

TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
inputFormat.setFilePath(tempFile.toURI().toString());
}

Configuration parameters = new Configuration();
inputFormat.configure(parameters);
TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
inputFormat.setFilePath(tempFile.toURI().toString());

inputFormat.setDelimiter(delimiter);
Configuration parameters = new Configuration();
inputFormat.configure(parameters);

FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.setDelimiter(delimiter);

inputFormat.open(splits[0]);
FileInputSplit[] splits = inputFormat.createInputSplits(1);

String result = "";
if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
|| (lineBreaker.equals(delimiter))){
inputFormat.open(splits[0]);

result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
assertEquals(first, result);
String result;
if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
|| (lineBreaker.equals(delimiter))){

result = inputFormat.nextRecord(result);
assertNotNull("Expecting second record here", result);
assertEquals(second, result);
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
assertEquals(first, result);

result = inputFormat.nextRecord(result);
assertNull("The input file is over", result);
result = inputFormat.nextRecord(result);
assertNotNull("Expecting second record here", result);
assertEquals(second, result);

} else {
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
assertEquals(content, result);
}
result = inputFormat.nextRecord(result);
assertNull("The input file is over", result);

}
catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
t.printStackTrace(System.err);
fail("Test erroneous");
} else {
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
assertEquals(content, result);
}
}

Expand Down

0 comments on commit 6613332

Please sign in to comment.