Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1223] Allow value escaping in CSV files. #187

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

package org.apache.flink.api.common.io;

import java.io.IOException;
import java.util.ArrayList;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.InstantiationUtil;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.util.ArrayList;


public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
Expand Down Expand Up @@ -299,9 +298,10 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
return false;
} else {
String lineAsString = new String(bytes, offset, numBytes);
throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
+ "Expect field types: "+fieldTypesToString()+" \n"
+ "in file: "+filePath);
throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+ "ParserError " + parser.getErrorState() + " \n"
+ "Expect field types: "+fieldTypesToString() + " \n"
+ "in file: " + filePath);
}
}
output++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public class StringParser extends FieldParser<String> {
private static final byte WHITESPACE_TAB = (byte) '\t';

private static final byte QUOTE_DOUBLE = (byte) '"';


private static enum ParserStates {
NONE, IN_QUOTE, STOP
}

private String result;

@Override
Expand All @@ -45,51 +49,86 @@ public int parseField(byte[] bytes, int startPos, int limit, char delim, String
while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) {
i++;
}


// first determine the boundaries of the cell
ParserStates parserState = ParserStates.NONE;

// the current position evaluated against the cell boundary
int endOfCellPosition = i - 1;

while (parserState != ParserStates.STOP && endOfCellPosition < limit) {
endOfCellPosition++;
// make sure we don't step over the end of the buffer
if(endOfCellPosition == limit) {
break;
}
current = bytes[endOfCellPosition];
if(current == delByte) {
// if we are in a quote do nothing, otherwise we reached the end
parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP;
} else if(current == QUOTE_DOUBLE) {
// we entered a quote
if(parserState == ParserStates.IN_QUOTE) {
// we end the quote
parserState = ParserStates.NONE;
} else {
// we start a new quote
parserState = ParserStates.IN_QUOTE;
}
}
}

if(parserState == ParserStates.IN_QUOTE) {
// exited due to line end without quote termination
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
return -1;
}


// boundary of the cell is now
// i --> endOfCellPosition

// first none whitespace character
if (i < limit && bytes[i] == QUOTE_DOUBLE) {
// quoted string
i++; // the quote

// we count only from after the quote
int quoteStart = i;
while (i < limit && bytes[i] != QUOTE_DOUBLE) {
i++;

// check if there are characters at the end
current = bytes[endOfCellPosition - 1];

// if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE
// there are unquoted characters at the end

if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) {
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
return -1; // illegal case of non-whitespace characters trailing
}

if (i < limit) {
// end of the string
this.result = new String(bytes, quoteStart, i-quoteStart);

i++; // the quote

// skip trailing whitespace characters
while (i < limit && (current = bytes[i]) != delByte) {
if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) {
i++;
}
else {
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
return -1; // illegal case of non-whitespace characters trailing
}

// skip trailing whitespace after quote .. by moving the cursor backwards
int skipAtEnd = 0;
while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) {
skipAtEnd++;
}

// now unescape
boolean notEscaped = true;
int endOfContent = i + 1;
for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) {
notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped;
if (notEscaped) {
// realign
bytes[endOfContent++] = bytes[counter];
}

return (i == limit ? limit : i+1);
} else {
// exited due to line end without quote termination
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
return -1;
}

this.result = new String(bytes, i + 1, endOfContent - i - 1);

return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
else {
// unquoted string
while (i < limit && bytes[i] != delByte) {
i++;
}


// set from the beginning. unquoted strings include the leading whitespaces
this.result = new String(bytes, startPos, i-startPos);
return (i == limit ? limit : i+1);
this.result = new String(bytes, i, endOfCellPosition - i);
return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

package org.apache.flink.api.java.record.io;

import java.io.IOException;

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.operators.CompilerHints;
Expand All @@ -36,7 +35,7 @@
import org.apache.flink.types.Value;
import org.apache.flink.types.parser.FieldParser;

import com.google.common.base.Preconditions;
import java.io.IOException;

/**
* Input format to parse text files and generate Records.
Expand Down Expand Up @@ -267,7 +266,7 @@ public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) t
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
//Find windows end line, so find chariage return before the newline
//Find windows end line, so find carriage return before the newline
if(this.lineDelimiterIsLinebreak == true && bytes[offset + numBytes -1] == '\r') {
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,31 @@

package org.apache.flink.api.java.io;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;

import com.google.common.base.Charsets;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CsvInputFormatTest {

private static final Path PATH = new Path("an/ignored/file/");
Expand Down Expand Up @@ -345,12 +349,87 @@ public void testReadSparseWithShuffledPositions() throws IOException {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}


@Test
public void testParseStringErrors() throws Exception {
StringParser stringParser = new StringParser();

Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
};

for (Object[] failure : failures) {
String input = (String) failure[0];

int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null);

assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
}


}

@Test
public void testParserCorrectness() throws Exception {
// RFC 4180 Compliance Test content
// Taken from http:https://en.wikipedia.org/wiki/Comma-separated_values#Example
final String fileContent =
"Year,Make,Model,Description,Price\n" +
"1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
"1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";

final FileInputSplit split = createTempFile(fileContent);

final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);

format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(',');

format.setFields(new boolean[] { true, true, true, true, true }, new Class<?>[] {
Integer.class, String.class, String.class, String.class, Double.class });

format.configure(new Configuration());
format.open(split);

Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();

Tuple5[] expectedLines = new Tuple5[]{
new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ),
new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
};

try {

for (Tuple5 expected : expectedLines) {
result = format.nextRecord(result);
assertEquals(expected, result);
}

assertNull(format.nextRecord(result));
assertTrue(format.reachedEnd());

} catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}

}

private FileInputSplit createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();

FileWriter wrt = new FileWriter(tempFile);

OutputStreamWriter wrt = new OutputStreamWriter(
new FileOutputStream(tempFile), Charsets.UTF_8
);
wrt.write(content);
wrt.close();

Expand Down