Skip to content

Commit

Permalink
[FLINK-1223] Allow value escaping in CSV files
Browse files Browse the repository at this point in the history
- Strings can now contain " quoted characters
- Skip trailing whitespace after quote

This closes apache#187.
  • Loading branch information
jkirsch authored and uce committed Nov 11, 2014
1 parent 2f1176a commit e855ef4
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

package org.apache.flink.api.common.io;

import java.io.IOException;
import java.util.ArrayList;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.InstantiationUtil;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.util.ArrayList;


public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
Expand Down Expand Up @@ -299,9 +298,10 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
return false;
} else {
String lineAsString = new String(bytes, offset, numBytes);
throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
+ "Expect field types: "+fieldTypesToString()+" \n"
+ "in file: "+filePath);
throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+ "ParserError " + parser.getErrorState() + " \n"
+ "Expect field types: "+fieldTypesToString() + " \n"
+ "in file: " + filePath);
}
}
output++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public class StringParser extends FieldParser<String> {
private static final byte WHITESPACE_TAB = (byte) '\t';

private static final byte QUOTE_DOUBLE = (byte) '"';


private static enum ParserStates {
NONE, IN_QUOTE, STOP
}

private String result;

@Override
Expand All @@ -45,51 +49,86 @@ public int parseField(byte[] bytes, int startPos, int limit, char delim, String
while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) {
i++;
}


// first determine the boundaries of the cell
ParserStates parserState = ParserStates.NONE;

// the current position evaluated against the cell boundary
int endOfCellPosition = i - 1;

while (parserState != ParserStates.STOP && endOfCellPosition < limit) {
endOfCellPosition++;
// make sure we don't step over the end of the buffer
if(endOfCellPosition == limit) {
break;
}
current = bytes[endOfCellPosition];
if(current == delByte) {
// if we are in a quote do nothing, otherwise we reached the end
parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP;
} else if(current == QUOTE_DOUBLE) {
// we entered a quote
if(parserState == ParserStates.IN_QUOTE) {
// we end the quote
parserState = ParserStates.NONE;
} else {
// we start a new quote
parserState = ParserStates.IN_QUOTE;
}
}
}

if(parserState == ParserStates.IN_QUOTE) {
// exited due to line end without quote termination
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
return -1;
}


// boundary of the cell is now
// i --> endOfCellPosition

// first none whitespace character
if (i < limit && bytes[i] == QUOTE_DOUBLE) {
// quoted string
i++; // the quote

// we count only from after the quote
int quoteStart = i;
while (i < limit && bytes[i] != QUOTE_DOUBLE) {
i++;

// check if there are characters at the end
current = bytes[endOfCellPosition - 1];

// if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE
// there are unquoted characters at the end

if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) {
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
return -1; // illegal case of non-whitespace characters trailing
}

if (i < limit) {
// end of the string
this.result = new String(bytes, quoteStart, i-quoteStart);

i++; // the quote

// skip trailing whitespace characters
while (i < limit && (current = bytes[i]) != delByte) {
if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) {
i++;
}
else {
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
return -1; // illegal case of non-whitespace characters trailing
}

// skip trailing whitespace after quote .. by moving the cursor backwards
int skipAtEnd = 0;
while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) {
skipAtEnd++;
}

// now unescape
boolean notEscaped = true;
int endOfContent = i + 1;
for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) {
notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped;
if (notEscaped) {
// realign
bytes[endOfContent++] = bytes[counter];
}

return (i == limit ? limit : i+1);
} else {
// exited due to line end without quote termination
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
return -1;
}

this.result = new String(bytes, i + 1, endOfContent - i - 1);

return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
else {
// unquoted string
while (i < limit && bytes[i] != delByte) {
i++;
}


// set from the beginning. unquoted strings include the leading whitespaces
this.result = new String(bytes, startPos, i-startPos);
return (i == limit ? limit : i+1);
this.result = new String(bytes, i, endOfCellPosition - i);
return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

package org.apache.flink.api.java.record.io;

import java.io.IOException;

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.operators.CompilerHints;
Expand All @@ -36,7 +35,7 @@
import org.apache.flink.types.Value;
import org.apache.flink.types.parser.FieldParser;

import com.google.common.base.Preconditions;
import java.io.IOException;

/**
* Input format to parse text files and generate Records.
Expand Down Expand Up @@ -267,7 +266,7 @@ public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) t
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
//Find windows end line, so find chariage return before the newline
//Find windows end line, so find carriage return before the newline
if(this.lineDelimiterIsLinebreak == true && bytes[offset + numBytes -1] == '\r') {
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,31 @@

package org.apache.flink.api.java.io;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;

import com.google.common.base.Charsets;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CsvInputFormatTest {

private static final Path PATH = new Path("an/ignored/file/");
Expand Down Expand Up @@ -345,12 +349,87 @@ public void testReadSparseWithShuffledPositions() throws IOException {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}


@Test
public void testParseStringErrors() throws Exception {
StringParser stringParser = new StringParser();

Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
};

for (Object[] failure : failures) {
String input = (String) failure[0];

int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null);

assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
}


}

@Test
public void testParserCorrectness() throws Exception {
// RFC 4180 Compliance Test content
// Taken from http:https://en.wikipedia.org/wiki/Comma-separated_values#Example
final String fileContent =
"Year,Make,Model,Description,Price\n" +
"1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
"1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
"1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";

final FileInputSplit split = createTempFile(fileContent);

final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);

format.setSkipFirstLineAsHeader(true);
format.setFieldDelimiter(',');

format.setFields(new boolean[] { true, true, true, true, true }, new Class<?>[] {
Integer.class, String.class, String.class, String.class, Double.class });

format.configure(new Configuration());
format.open(split);

Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();

Tuple5[] expectedLines = new Tuple5[]{
new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ),
new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
};

try {

for (Tuple5 expected : expectedLines) {
result = format.nextRecord(result);
assertEquals(expected, result);
}

assertNull(format.nextRecord(result));
assertTrue(format.reachedEnd());

} catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}

}

private FileInputSplit createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();

FileWriter wrt = new FileWriter(tempFile);

OutputStreamWriter wrt = new OutputStreamWriter(
new FileOutputStream(tempFile), Charsets.UTF_8
);
wrt.write(content);
wrt.close();

Expand Down

0 comments on commit e855ef4

Please sign in to comment.