Skip to content

Commit

Permalink
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.
Browse files Browse the repository at this point in the history
- extends first commit.

This closes apache#2901.
  • Loading branch information
greghogan authored and fhueske committed Dec 8, 2016
1 parent f2186af commit 41d5875
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
Expand All @@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
*/
private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);

/** The default charset to convert strings to bytes */
private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
// The charset used to convert strings to bytes
private String charsetName = "UTF-8";

// Charset is not serializable
private transient Charset charset;

/**
* The default read buffer size = 1MB.
Expand Down Expand Up @@ -157,9 +161,12 @@ protected static void loadConfigParameters(Configuration parameters) {
// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
// --------------------------------------------------------------------------------------------


// The delimiter may be set with a byte-sequence or a String. In the latter
// case the byte representation is updated consistent with current charset.
private byte[] delimiter = new byte[] {'\n'};

private String delimiterString = null;

private int lineLengthLimit = Integer.MAX_VALUE;

private int bufferSize = -1;
Expand All @@ -182,8 +189,42 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) {
}
loadConfigParameters(configuration);
}



/**
* Get the character set used for the row delimiter. This is also used by
* subclasses to interpret field delimiters, comment strings, and for
* configuring {@link FieldParser}s.
*
* @return the charset
*/
@PublicEvolving
public Charset getCharset() {
if (this.charset == null) {
this.charset = Charset.forName(charsetName);
}
return this.charset;
}

/**
* Set the name of the character set used for the row delimiter. This is
* also used by subclasses to interpret field delimiters, comment strings,
* and for configuring {@link FieldParser}s.
*
* These fields are interpreted when set. Changing the charset thereafter
* may cause unexpected results.
*
* @param charset name of the charset
*/
@PublicEvolving
public void setCharset(String charset) {
this.charsetName = Preconditions.checkNotNull(charset);
this.charset = null;

if (this.delimiterString != null) {
this.delimiter = delimiterString.getBytes(getCharset());
}
}

public byte[] getDelimiter() {
return delimiter;
}
Expand All @@ -193,6 +234,7 @@ public void setDelimiter(byte[] delimiter) {
throw new IllegalArgumentException("Delimiter must not be null");
}
this.delimiter = delimiter;
this.delimiterString = null;
}

public void setDelimiter(char delimiter) {
Expand All @@ -203,7 +245,8 @@ public void setDelimiter(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
this.delimiter = delimiter.getBytes(getCharset());
this.delimiterString = delimiter;
}

public int getLineLengthLimit() {
Expand Down Expand Up @@ -264,7 +307,7 @@ public void setNumLineSamples(int numLineSamples) {
// --------------------------------------------------------------------------------------------

/**
* Configures this input format by reading the path to the file from the configuration andge the string that
* Configures this input format by reading the path to the file from the configuration and the string that
* defines the record delimiter.
*
* @param parameters The configuration object to read the parameters from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

/** The charset used to convert strings to bytes */
private Charset charset = Charset.forName("UTF-8");

private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];

private static final boolean[] EMPTY_INCLUDED = new boolean[0];
Expand Down Expand Up @@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private Class<?>[] fieldTypes = EMPTY_TYPES;

protected boolean[] fieldIncluded = EMPTY_INCLUDED;


// The byte representation of the delimiter is updated consistent with
// current charset.
private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;

private String fieldDelimString = null;

private boolean lenient;

private boolean skipFirstLineAsHeader;
Expand All @@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private byte quoteCharacter;

// The byte representation of the comment prefix is updated consistent with
// current charset.
protected byte[] commentPrefix = null;
private String commentPrefixString = null;


// --------------------------------------------------------------------------------------------
Expand All @@ -105,11 +106,6 @@ protected GenericCsvInputFormat(Path filePath) {
super(filePath, null);
}

protected GenericCsvInputFormat(Path filePath, Charset charset) {
this(filePath);
this.charset = Preconditions.checkNotNull(charset);
}

// --------------------------------------------------------------------------------------------

public int getNumberOfFieldsTotal() {
Expand All @@ -120,43 +116,43 @@ public int getNumberOfNonNullFields() {
return this.fieldTypes.length;
}

@Override
public void setCharset(String charset) {
super.setCharset(charset);

if (this.fieldDelimString != null) {
this.fieldDelim = fieldDelimString.getBytes(getCharset());
}

if (this.commentPrefixString != null) {
this.commentPrefix = commentPrefixString.getBytes(getCharset());
}
}

public byte[] getCommentPrefix() {
return commentPrefix;
}

public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, charset);
}

private void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
if (commentPrefix != null) {
this.commentPrefix = commentPrefix.getBytes(charset);
this.commentPrefix = commentPrefix.getBytes(getCharset());
} else {
this.commentPrefix = null;
}
this.commentPrefixString = commentPrefix;
}

public byte[] getFieldDelimiter() {
return fieldDelim;
}

public void setFieldDelimiter(byte[] delimiter) {
public void setFieldDelimiter(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}

this.fieldDelim = delimiter;
}

public void setFieldDelimiter(char delimiter) {
setFieldDelimiter(String.valueOf(delimiter));
}

public void setFieldDelimiter(String delimiter) {
this.fieldDelim = delimiter.getBytes(charset);
this.fieldDelim = delimiter.getBytes(getCharset());
this.fieldDelimString = delimiter;
}

public boolean isLenient() {
Expand Down Expand Up @@ -296,33 +292,14 @@ protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
this.fieldIncluded = includedMask;
}

/**
* Gets the character set for the parser. Default is set to UTF-8.
*
* @return The charset for the parser.
*/
Charset getCharset() {
return this.charset;
}

/**
* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
* when doing a parse.
*
* @param charset The character set to set.
*/
public void setCharset(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}

// --------------------------------------------------------------------------------------------
// Runtime methods
// --------------------------------------------------------------------------------------------

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);

// instantiate the parsers
FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length];

Expand All @@ -335,7 +312,7 @@ public void open(FileInputSplit split) throws IOException {

FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);

p.setCharset(this.getCharset());
p.setCharset(getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -79,7 +80,7 @@ public static enum ParseErrorState {
BOOLEAN_INVALID
}

private Charset charset = Charset.forName("UTF-8");
private Charset charset = StandardCharsets.UTF_8;

private ParseErrorState errorState = ParseErrorState.NONE;

Expand All @@ -105,9 +106,7 @@ public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[]

/**
* Each parser's logic should be implemented inside this method
*
* @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)}
* */
*/
protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse);

/**
Expand Down Expand Up @@ -221,20 +220,19 @@ protected static final int nextStringLength(byte[] bytes, int startPos, int leng
return limitedLength;
}

/*
* Gets the Charset for the parser.Default is set to ASCII
/**
* Gets the character set used for this parser.
*
* @return The charset for the parser.
* @return the charset used for this parser.
*/
public Charset getCharset() {
return this.charset;
}

/**
* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
* when doing a parse.
* Sets the character set used for this parser.
*
* @param charset The charset to set.
* @param charset charset used for this parser.
*/
public void setCharset(Charset charset) {
this.charset = charset;
Expand Down
Loading

0 comments on commit 41d5875

Please sign in to comment.