diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 30a73582e999b..a6ee1dae0742a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -19,16 +19,15 @@ package org.apache.flink.api.common.io; -import java.io.IOException; -import java.util.ArrayList; - +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.InstantiationUtil; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; +import java.io.IOException; +import java.util.ArrayList; public abstract class GenericCsvInputFormat extends DelimitedInputFormat { @@ -299,9 +298,10 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu return false; } else { String lineAsString = new String(bytes, offset, numBytes); - throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n" - + "Expect field types: "+fieldTypesToString()+" \n" - + "in file: "+filePath); + throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n" + + "ParserError " + parser.getErrorState() + " \n" + + "Expect field types: "+fieldTypesToString() + " \n" + + "in file: " + filePath); } } output++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index a465c1fdd5c1c..a39dbe0b68231 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -30,7 +30,11 @@ public class StringParser extends FieldParser { private static final byte WHITESPACE_TAB = (byte) '\t'; private static final byte QUOTE_DOUBLE = (byte) '"'; - + + private static enum ParserStates { + NONE, IN_QUOTE, STOP + } + private String result; @Override @@ -45,51 +49,86 @@ public int parseField(byte[] bytes, int startPos, int limit, char delim, String while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { i++; } - + + // first determine the boundaries of the cell + ParserStates parserState = ParserStates.NONE; + + // the current position evaluated against the cell boundary + int endOfCellPosition = i - 1; + + while (parserState != ParserStates.STOP && endOfCellPosition < limit) { + endOfCellPosition++; + // make sure we don't step over the end of the buffer + if(endOfCellPosition == limit) { + break; + } + current = bytes[endOfCellPosition]; + if(current == delByte) { + // if we are in a quote do nothing, otherwise we reached the end + parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP; + } else if(current == QUOTE_DOUBLE) { + // we entered a quote + if(parserState == ParserStates.IN_QUOTE) { + // we end the quote + parserState = ParserStates.NONE; + } else { + // we start a new quote + parserState = ParserStates.IN_QUOTE; + } + } + } + + if(parserState == ParserStates.IN_QUOTE) { + // exited due to line end without quote termination + setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); + return -1; + } + + + // boundary of the cell is now + // i --> endOfCellPosition + // first none whitespace character if (i < limit && bytes[i] == QUOTE_DOUBLE) { - // quoted string - i++; // the quote - - // we count only from after the quote - int quoteStart = i; - while (i < limit && bytes[i] != QUOTE_DOUBLE) { - i++; + + // check if there are characters at the end + current = bytes[endOfCellPosition - 1]; + + // if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE + // there are unquoted characters at the end + + if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) { + setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); + return -1; // illegal case of non-whitespace characters trailing } - - if (i < limit) { - // end of the string - this.result = new String(bytes, quoteStart, i-quoteStart); - - i++; // the quote - - // skip trailing whitespace characters - while (i < limit && (current = bytes[i]) != delByte) { - if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) { - i++; - } - else { - setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); - return -1; // illegal case of non-whitespace characters trailing - } + + // skip trailing whitespace after quote .. by moving the cursor backwards + int skipAtEnd = 0; + while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) { + skipAtEnd++; + } + + // now unescape + boolean notEscaped = true; + int endOfContent = i + 1; + for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) { + notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped; + if (notEscaped) { + // realign + bytes[endOfContent++] = bytes[counter]; } - - return (i == limit ? limit : i+1); - } else { - // exited due to line end without quote termination - setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); - return -1; } + + this.result = new String(bytes, i + 1, endOfContent - i - 1); + + return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); } else { // unquoted string - while (i < limit && bytes[i] != delByte) { - i++; - } - + // set from the beginning. unquoted strings include the leading whitespaces - this.result = new String(bytes, startPos, i-startPos); - return (i == limit ? limit : i+1); + this.result = new String(bytes, i, endOfCellPosition - i); + return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java index c7f86af815d18..15333e8e9e7fa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java @@ -19,8 +19,7 @@ package org.apache.flink.api.java.record.io; -import java.io.IOException; - +import com.google.common.base.Preconditions; import org.apache.flink.api.common.io.GenericCsvInputFormat; import org.apache.flink.api.common.io.ParseException; import org.apache.flink.api.common.operators.CompilerHints; @@ -36,7 +35,7 @@ import org.apache.flink.types.Value; import org.apache.flink.types.parser.FieldParser; -import com.google.common.base.Preconditions; +import java.io.IOException; /** * Input format to parse text files and generate Records. @@ -267,7 +266,7 @@ public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) t /* * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n */ - //Find windows end line, so find chariage return before the newline + //Find windows end line, so find carriage return before the newline if(this.lineDelimiterIsLinebreak == true && bytes[offset + numBytes -1] == '\r') { //reduce the number of bytes so that the Carriage return is not taken as data numBytes--; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 5bf35e7b002e0..0662aa6fb586a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -19,18 +19,7 @@ package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - +import com.google.common.base.Charsets; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -38,8 +27,23 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class CsvInputFormatTest { private static final Path PATH = new Path("an/ignored/file/"); @@ -345,12 +349,87 @@ public void testReadSparseWithShuffledPositions() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + + @Test + public void testParseStringErrors() throws Exception { + StringParser stringParser = new StringParser(); + + Object[][] failures = { + {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING}, + {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING} + }; + + for (Object[] failure : failures) { + String input = (String) failure[0]; + + int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null); + + assertThat(result, is(-1)); + assertThat(stringParser.getErrorState(), is(failure[1])); + } + + + } + + @Test + public void testParserCorrectness() throws Exception { + // RFC 4180 Compliance Test content + // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example + final String fileContent = + "Year,Make,Model,Description,Price\n" + + "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" + + "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" + + "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" + + ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"; + + final FileInputSplit split = createTempFile(fileContent); + + final CsvInputFormat> format = + new CsvInputFormat>(PATH); + + format.setSkipFirstLineAsHeader(true); + format.setFieldDelimiter(','); + + format.setFields(new boolean[] { true, true, true, true, true }, new Class[] { + Integer.class, String.class, String.class, String.class, Double.class }); + + format.configure(new Configuration()); + format.open(split); + + Tuple5 result = new Tuple5(); + + Tuple5[] expectedLines = new Tuple5[]{ + new Tuple5(1997, "Ford", "E350", "ac, abs, moon", 3000.0), + new Tuple5(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0), + new Tuple5(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00), + new Tuple5(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ), + new Tuple5(0, "", "Venture \"Extended Edition\"", "", 4900.0) + }; + + try { + + for (Tuple5 expected : expectedLines) { + result = format.nextRecord(result); + assertEquals(expected, result); + } + + assertNull(format.nextRecord(result)); + assertTrue(format.reachedEnd()); + + } catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + + } + private FileInputSplit createTempFile(String content) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); - - FileWriter wrt = new FileWriter(tempFile); + + OutputStreamWriter wrt = new OutputStreamWriter( + new FileOutputStream(tempFile), Charsets.UTF_8 + ); wrt.write(content); wrt.close();