Skip to content

Commit

Permalink
[FLINK-17604][csv] Implement format factory for CSV serialization and…
Browse files Browse the repository at this point in the history
… deseriazation schema of RowData type

This closes apache#12065
  • Loading branch information
danny0405 authored and wuchong committed May 14, 2020
1 parent 360abcc commit 654a637
Show file tree
Hide file tree
Showing 5 changed files with 611 additions and 0 deletions.
9 changes: 9 additions & 0 deletions flink-formats/flink-csv/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- CSV RowData format factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.csv;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.formats.csv.CsvOptions.ALLOW_COMMENTS;
import static org.apache.flink.formats.csv.CsvOptions.ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvOptions.ESCAPE_CHARACTER;
import static org.apache.flink.formats.csv.CsvOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvOptions.LINE_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvOptions.QUOTE_CHARACTER;

/**
* Format factory for providing configured instances of CSV to RowData {@link SerializationSchema}
* and {@link DeserializationSchema}.
*/
public final class CsvFormatFactory implements
DeserializationFormatFactory,
SerializationFormatFactory {

public static final String IDENTIFIER = "csv";

@SuppressWarnings("unchecked")
@Override
public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);

return new ScanFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createScanFormat(
ScanTableSource.Context scanContext,
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) scanContext.createTypeInformation(producedDataType);
final CsvRowDataDeserializationSchema.Builder schemaBuilder =
new CsvRowDataDeserializationSchema.Builder(
rowType,
rowDataTypeInfo);
configureDeserializationSchema(formatOptions, schemaBuilder);
return schemaBuilder.build();
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}

@Override
public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);

return new SinkFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createSinkFormat(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
final CsvRowDataSerializationSchema.Builder schemaBuilder =
new CsvRowDataSerializationSchema.Builder(rowType);
configureSerializationSchema(formatOptions, schemaBuilder);
return schemaBuilder.build();
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FIELD_DELIMITER);
options.add(LINE_DELIMITER);
options.add(DISABLE_QUOTE_CHARACTER);
options.add(QUOTE_CHARACTER);
options.add(ALLOW_COMMENTS);
options.add(IGNORE_PARSE_ERRORS);
options.add(ARRAY_ELEMENT_DELIMITER);
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
return options;
}

// ------------------------------------------------------------------------
// Validation
// ------------------------------------------------------------------------

private static void validateFormatOptions(ReadableConfig tableOptions) {
final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
if (isDisabledQuoteCharacter && hasQuoteCharacter){
throw new ValidationException(
"Format cannot define a quote character and disabled quote character at the same time.");
}
// Validate the option value must be a single char.
validateCharacterVal(tableOptions, FIELD_DELIMITER);
validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
validateCharacterVal(tableOptions, QUOTE_CHARACTER);
validateCharacterVal(tableOptions, ESCAPE_CHARACTER);

tableOptions.getOptional(LINE_DELIMITER).ifPresent(delimiter -> {
Set<String> allowedValues = new HashSet<>(Arrays.asList("\r", "\n", "\r\n", ""));
if (!allowedValues.contains(delimiter)) {
throw new ValidationException(
String.format("Invalid value for option '%s.%s'. Supported values are %s, but was: %s",
IDENTIFIER,
LINE_DELIMITER.key(),
"[\\r, \\n, \\r\\n, \"\"]",
delimiter));
}
});
}

/** Validates the option {@code option} value must be a Character. */
private static void validateCharacterVal(
ReadableConfig tableOptions,
ConfigOption<String> option) {
if (tableOptions.getOptional(option).isPresent()) {
if (tableOptions.get(option).length() != 1) {
throw new ValidationException(
String.format("Option '%s.%s' must be a string with single character, but was: %s",
IDENTIFIER,
option.key(),
tableOptions.get(option)));
}
}
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

private static void configureDeserializationSchema(
ReadableConfig formatOptions,
CsvRowDataDeserializationSchema.Builder schemaBuilder) {
formatOptions.getOptional(FIELD_DELIMITER)
.map(delimiter -> delimiter.charAt(0))
.ifPresent(schemaBuilder::setFieldDelimiter);

formatOptions.getOptional(QUOTE_CHARACTER)
.map(quote -> quote.charAt(0))
.ifPresent(schemaBuilder::setQuoteCharacter);

formatOptions.getOptional(ALLOW_COMMENTS)
.ifPresent(schemaBuilder::setAllowComments);

formatOptions.getOptional(IGNORE_PARSE_ERRORS)
.ifPresent(schemaBuilder::setIgnoreParseErrors);

formatOptions.getOptional(ARRAY_ELEMENT_DELIMITER)
.ifPresent(schemaBuilder::setArrayElementDelimiter);

formatOptions.getOptional(ESCAPE_CHARACTER)
.map(escape -> escape.charAt(0))
.ifPresent(schemaBuilder::setEscapeCharacter);

formatOptions.getOptional(NULL_LITERAL)
.ifPresent(schemaBuilder::setNullLiteral);
}

private static void configureSerializationSchema(
ReadableConfig formatOptions,
CsvRowDataSerializationSchema.Builder schemaBuilder) {
formatOptions.getOptional(FIELD_DELIMITER)
.map(delimiter -> delimiter.charAt(0))
.ifPresent(schemaBuilder::setFieldDelimiter);

formatOptions.getOptional(LINE_DELIMITER)
.ifPresent(schemaBuilder::setLineDelimiter);

if (formatOptions.get(DISABLE_QUOTE_CHARACTER)) {
schemaBuilder.disableQuoteCharacter();
} else {
formatOptions.getOptional(QUOTE_CHARACTER)
.map(quote -> quote.charAt(0))
.ifPresent(schemaBuilder::setQuoteCharacter);
}

formatOptions.getOptional(ARRAY_ELEMENT_DELIMITER)
.ifPresent(schemaBuilder::setArrayElementDelimiter);

formatOptions.getOptional(ESCAPE_CHARACTER)
.map(escape -> escape.charAt(0))
.ifPresent(schemaBuilder::setEscapeCharacter);

formatOptions.getOptional(NULL_LITERAL)
.ifPresent(schemaBuilder::setNullLiteral);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.csv;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** Options for CSV format. */
public class CsvOptions {
private CsvOptions() {}

// ------------------------------------------------------------------------
// Options
// ------------------------------------------------------------------------

public static final ConfigOption<String> FIELD_DELIMITER = ConfigOptions
.key("field-delimiter")
.stringType()
.defaultValue(",")
.withDescription("Optional field delimiter character (',' by default)");

public static final ConfigOption<String> LINE_DELIMITER = ConfigOptions
.key("line-delimiter")
.stringType()
.defaultValue("\n")
.withDescription("Optional line delimiter (\"\\n\" by default, otherwise\n"
+ "\"\\r\" or \"\\r\\n\" are allowed), unicode is supported if\n"
+ "the delimiter is an invisible special character,\n"
+ "e.g. U&'\\000D' is the unicode representation of carriage return \"\\r\""
+ "e.g. U&'\\000A' is the unicode representation of line feed \"\\n\"");

public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER = ConfigOptions
.key("disable-quote-character")
.booleanType()
.defaultValue(false)
.withDescription("Optional flag to disabled quote character for enclosing field values (false by default)\n"
+ "if true, quote-character can not be set");

public static final ConfigOption<String> QUOTE_CHARACTER = ConfigOptions
.key("quote-character")
.stringType()
.defaultValue("\"")
.withDescription("Optional quote character for enclosing field values ('\"' by default)");

public static final ConfigOption<Boolean> ALLOW_COMMENTS = ConfigOptions
.key("allow-comments")
.booleanType()
.defaultValue(false)
.withDescription("Optional flag to ignore comment lines that start with \"#\"\n"
+ "(disabled by default);\n"
+ "if enabled, make sure to also ignore parse errors to allow empty rows");

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = ConfigOptions
.key("ignore-parse-errors")
.booleanType()
.defaultValue(false)
.withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ "fields are set to null in case of errors");

public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER = ConfigOptions
.key("array-element-delimiter")
.stringType()
.defaultValue(";")
.withDescription("Optional array element delimiter string for separating\n"
+ "array and row element values (\";\" by default)");

public static final ConfigOption<String> ESCAPE_CHARACTER = ConfigOptions
.key("escape-character")
.stringType()
.noDefaultValue()
.withDescription("Optional escape character for escaping values (disabled by default)");

public static final ConfigOption<String> NULL_LITERAL = ConfigOptions
.key("null-literal")
.stringType()
.noDefaultValue()
.withDescription("Optional null literal string that is interpreted as a\n"
+ "null value (disabled by default)");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.formats.csv.CsvFormatFactory
Loading

0 comments on commit 654a637

Please sign in to comment.