Skip to content

Commit

Permalink
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.
Browse files Browse the repository at this point in the history
This closes apache#2060.
  • Loading branch information
rekhajoshm authored and fhueske committed Dec 8, 2016
1 parent 2d8f03e commit f2186af
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

/** The default charset to convert strings to bytes */
private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
/** The charset used to convert strings to bytes */
private Charset charset = Charset.forName("UTF-8");

private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];

private static final boolean[] EMPTY_INCLUDED = new boolean[0];
Expand Down Expand Up @@ -107,6 +105,11 @@ protected GenericCsvInputFormat(Path filePath) {
super(filePath, null);
}

protected GenericCsvInputFormat(Path filePath, Charset charset) {
this(filePath);
this.charset = Preconditions.checkNotNull(charset);
}

// --------------------------------------------------------------------------------------------

public int getNumberOfFieldsTotal() {
Expand All @@ -121,32 +124,11 @@ public byte[] getCommentPrefix() {
return commentPrefix;
}

public void setCommentPrefix(byte[] commentPrefix) {
this.commentPrefix = commentPrefix;
}

public void setCommentPrefix(char commentPrefix) {
setCommentPrefix(String.valueOf(commentPrefix));
}

public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, UTF_8_CHARSET);
setCommentPrefix(commentPrefix, charset);
}

public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new IllegalArgumentException("Charset name must not be null");
}

if (commentPrefix != null) {
Charset charset = Charset.forName(charsetName);
setCommentPrefix(commentPrefix, charset);
} else {
this.commentPrefix = null;
}
}

public void setCommentPrefix(String commentPrefix, Charset charset) {
private void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
Expand Down Expand Up @@ -174,7 +156,7 @@ public void setFieldDelimiter(char delimiter) {
}

public void setFieldDelimiter(String delimiter) {
this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
this.fieldDelim = delimiter.getBytes(charset);
}

public boolean isLenient() {
Expand Down Expand Up @@ -314,6 +296,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
this.fieldIncluded = includedMask;
}

/**
* Gets the character set for the parser. Default is set to UTF-8.
*
* @return The charset for the parser.
*/
Charset getCharset() {
return this.charset;
}

/**
* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
* when doing a parse.
*
* @param charset The character set to set.
*/
public void setCharset(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}

// --------------------------------------------------------------------------------------------
// Runtime methods
// --------------------------------------------------------------------------------------------
Expand All @@ -334,6 +335,7 @@ public void open(FileInputSplit split) throws IOException {

FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);

p.setCharset(this.getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
Expand Down Expand Up @@ -449,7 +451,7 @@ protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) {
// search for ending quote character, continue when it is escaped
i++;

while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){
while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) {
i++;
}
i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@

package org.apache.flink.types.parser;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
Expand All @@ -34,6 +29,12 @@
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

/**
* A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated
* by the end of the byte sequence or a delimiter.
Expand Down Expand Up @@ -77,9 +78,11 @@ public static enum ParseErrorState {
/** Invalid Boolean value **/
BOOLEAN_INVALID
}


private Charset charset = Charset.forName("UTF-8");

private ParseErrorState errorState = ParseErrorState.NONE;

/**
* Parses the value of a field from the byte array, taking care of properly reset
* the state of this parser.
Expand Down Expand Up @@ -217,7 +220,26 @@ protected static final int nextStringLength(byte[] bytes, int startPos, int leng

return limitedLength;
}


/*
* Gets the Charset for the parser.Default is set to ASCII
*
* @return The charset for the parser.
*/
public Charset getCharset() {
return this.charset;
}

/**
* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
* when doing a parse.
*
* @param charset The charset to set.
*/
public void setCharset(Charset charset) {
this.charset = charset;
}

// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S
// check for proper termination
if (i == limit) {
// either by end of line
this.result = new String(bytes, startPos + 1, i - startPos - 2);
this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return limit;
} else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) {
// or following field delimiter
this.result = new String(bytes, startPos + 1, i - startPos - 2);
this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return i + delimiter.length;
} else {
// no proper termination
Expand All @@ -87,14 +87,14 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S
if (limit == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
this.result = new String(bytes, startPos, limit - startPos);
this.result = new String(bytes, startPos, limit - startPos, getCharset());
return limit;
} else {
// delimiter found.
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
this.result = new String(bytes, startPos, i - startPos);
this.result = new String(bytes, startPos, i - startPos, getCharset());
return i + delimiter.length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPOutputStream;
Expand Down Expand Up @@ -485,7 +486,7 @@ public void testReadInvalidContents() throws IOException {
fail("Input format accepted on invalid input.");
}
catch (ParseException e) {
; // all good
// all good
}
}
catch (Exception ex) {
Expand Down Expand Up @@ -547,7 +548,38 @@ public void testReadInvalidContentsLenientWithSkipping() {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}


@Test
public void testReadWithCharset() throws IOException {
try {
final String fileContent = "\u00bf|Flink|\u00f1";
final FileInputSplit split = createTempFile(fileContent);

final Configuration parameters = new Configuration();

format.setCharset(Charset.forName("UTF-8"));
format.setFieldDelimiter("|");
format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);

format.configure(parameters);
format.open(split);

Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};

values = format.nextRecord(values);
assertNotNull(values);
assertEquals("\u00bf", ((StringValue) values[0]).getValue());
assertEquals("Flink", ((StringValue) values[1]).getValue());
assertEquals("\u00f1", ((StringValue) values[2]).getValue());

assertNull(format.nextRecord(values));
assertTrue(format.reachedEnd());
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}

@Test
public void readWithEmptyField() {
try {
Expand Down Expand Up @@ -722,7 +754,7 @@ private FileInputSplit createTempGzipFile(String content) throws IOException {
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
}

private final Value[] createIntValues(int num) {
private Value[] createIntValues(int num) {
Value[] v = new Value[num];

for (int i = 0; i < num; i++) {
Expand All @@ -732,7 +764,7 @@ private final Value[] createIntValues(int num) {
return v;
}

private final Value[] createLongValues(int num) {
private Value[] createLongValues(int num) {
Value[] v = new Value[num];

for (int i = 0; i < num; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.types.Value;
import org.junit.Test;

import java.nio.charset.Charset;

public class VarLengthStringParserTest {

public StringValueParser parser = new StringValueParser();
Expand Down Expand Up @@ -194,4 +196,22 @@ public void testParseInvalidQuotedStrings() {
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos < 0);
}

@Test
public void testParseValidMixedStringsWithCharset() {

Charset charset = Charset.forName("US-ASCII");
this.parser = new StringValueParser();
this.parser.enableQuotedStringParsing((byte) '@');

// check valid strings with out whitespaces and trailing delimiter
byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
StringValue s = new StringValue();

int startPos = 0;
parser.setCharset(charset);
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
assertTrue(startPos == 11);
assertTrue(s.getValue().equals("abcde|gh"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.java.io;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;

Expand Down Expand Up @@ -64,6 +65,8 @@ public class CsvReader {
protected boolean skipFirstLineAsHeader = false;

protected boolean ignoreInvalidLines = false;

private Charset charset = Charset.forName("UTF-8");

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -157,7 +160,25 @@ public CsvReader ignoreComments(String commentPrefix) {
this.commentPrefix = commentPrefix;
return this;
}


/**
* Gets the character set for the reader. Default is set to UTF-8.
*
* @return The charset for the reader.
*/
public Charset getCharset() {
return this.charset;
}

/**
* Sets the charset of the reader
*
* @param charset The character set to set.
*/
public void setCharset(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}

/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
Expand Down Expand Up @@ -340,6 +361,7 @@ private void configureInputFormat(CsvInputFormat<?> format) {
format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
format.setLenient(ignoreInvalidLines);
format.setCharset(this.charset);
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
Expand Down
Loading

0 comments on commit f2186af

Please sign in to comment.