Skip to content

Commit

Permalink
[FLINK-17626][fs-connector] Fs connector should use FLIP-122 format o…
Browse files Browse the repository at this point in the history
…ptions style


This closes apache#12212
  • Loading branch information
JingsongLi committed May 19, 2020
1 parent a47c611 commit 92d6742
Show file tree
Hide file tree
Showing 38 changed files with 410 additions and 421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_TIME_INTERVAL;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;

/**
* Table sink to write to Hive tables.
Expand Down Expand Up @@ -184,7 +184,7 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) {
TableRollingPolicy rollingPolicy = new TableRollingPolicy(
true,
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL));
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
InactiveBucketListener listener = new InactiveBucketListener();

Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.PartitionPathUtils;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
Expand All @@ -44,47 +44,43 @@
import org.apache.avro.io.DatumWriter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;

/**
* Avro format factory for file system.
*/
public class AvroFileSystemFormatFactory implements FileSystemFormatFactory {

public static final String AVRO_OUTPUT_CODEC = "format." + DataFileConstants.CODEC;
public static final String IDENTIFIER = "avro";
public static final ConfigOption<String> AVRO_OUTPUT_CODEC = ConfigOptions.key("codec")
.stringType()
.noDefaultValue()
.withDescription("The compression codec for avro");

@Override
public boolean supportsSchemaDerivation() {
return true;
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public List<String> supportedProperties() {
List<String> options = new ArrayList<>();
options.add(AVRO_OUTPUT_CODEC);
return options;
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(FORMAT, "avro");
return context;
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(AVRO_OUTPUT_CODEC);
return options;
}

@Override
public InputFormat<RowData, ?> createReader(ReaderContext context) {
DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(context.getFormatProperties());

String[] fieldNames = context.getSchema().getFieldNames();
List<String> projectFields = Arrays.stream(context.getProjectFields())
.mapToObj(idx -> fieldNames[idx])
Expand Down Expand Up @@ -123,7 +119,7 @@ public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
return Optional.of(new RowDataAvroWriterFactory(
context.getFormatRowType(),
context.getFormatProperties().get(AVRO_OUTPUT_CODEC)));
context.getFormatOptions().get(AVRO_OUTPUT_CODEC)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.formats.avro.AvroFormatFactory
org.apache.flink.formats.avro.AvroFileSystemFormatFactory
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@
# limitations under the License.

org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.formats.avro.AvroFileSystemFormatFactory
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='avro'");
if (configure) {
ret.add("'format.avro.codec'='snappy'");
ret.add("'avro.codec'='snappy'");
}
return ret.toArray(new String[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.DeserializationRuntimeConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.FileSystemFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -41,34 +42,61 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ALLOW_COMMENTS;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ESCAPE_CHARACTER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_FIELD_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_IGNORE_PARSE_ERRORS;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_LINE_DELIMITER;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_NULL_LITERAL;
import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_QUOTE_CHARACTER;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
import static org.apache.flink.formats.csv.CsvFormatFactory.validateFormatOptions;
import static org.apache.flink.formats.csv.CsvOptions.ALLOW_COMMENTS;
import static org.apache.flink.formats.csv.CsvOptions.ARRAY_ELEMENT_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.DISABLE_QUOTE_CHARACTER;
import static org.apache.flink.formats.csv.CsvOptions.ESCAPE_CHARACTER;
import static org.apache.flink.formats.csv.CsvOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvOptions.LINE_DELIMITER;
import static org.apache.flink.formats.csv.CsvOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvOptions.QUOTE_CHARACTER;

/**
* CSV format factory for file system.
*/
public class CsvFileSystemFormatFactory implements FileSystemFormatFactory {

public static final String IDENTIFIER = "csv";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FIELD_DELIMITER);
options.add(LINE_DELIMITER);
options.add(DISABLE_QUOTE_CHARACTER);
options.add(QUOTE_CHARACTER);
options.add(ALLOW_COMMENTS);
options.add(IGNORE_PARSE_ERRORS);
options.add(ARRAY_ELEMENT_DELIMITER);
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
return options;
}

@Override
public InputFormat<RowData, ?> createReader(ReaderContext context) {
DescriptorProperties properties = getValidatedProperties(context.getFormatProperties());
ReadableConfig options = context.getFormatOptions();
validateFormatOptions(options);

RowType formatRowType = context.getFormatRowType();

Expand All @@ -87,9 +115,9 @@ public class CsvFileSystemFormatFactory implements FileSystemFormatFactory {
.mapToInt(csvFields::indexOf)
.toArray();

CsvSchema csvSchema = buildCsvSchema(formatRowType, properties);
CsvSchema csvSchema = buildCsvSchema(formatRowType, options);

boolean ignoreParseErrors = properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false);
boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);

return new CsvInputFormat(
context.getPaths(),
Expand All @@ -106,64 +134,64 @@ public class CsvFileSystemFormatFactory implements FileSystemFormatFactory {
ignoreParseErrors);
}

private CsvSchema buildCsvSchema(RowType rowType, DescriptorProperties properties) {
private CsvSchema buildCsvSchema(RowType rowType, ReadableConfig options) {
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);
CsvSchema.Builder csvBuilder = csvSchema.rebuild();
//format properties
properties.getOptionalCharacter(FORMAT_FIELD_DELIMITER)
options.getOptional(FIELD_DELIMITER).map(s -> s.charAt(0))
.ifPresent(csvBuilder::setColumnSeparator);

properties.getOptionalCharacter(FORMAT_QUOTE_CHARACTER)
options.getOptional(QUOTE_CHARACTER).map(s -> s.charAt(0))
.ifPresent(csvBuilder::setQuoteChar);

properties.getOptionalBoolean(FORMAT_ALLOW_COMMENTS)
options.getOptional(ALLOW_COMMENTS)
.ifPresent(csvBuilder::setAllowComments);

properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER)
options.getOptional(ARRAY_ELEMENT_DELIMITER)
.ifPresent(csvBuilder::setArrayElementSeparator);

properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER)
options.getOptional(ARRAY_ELEMENT_DELIMITER)
.ifPresent(csvBuilder::setArrayElementSeparator);

properties.getOptionalCharacter(FORMAT_ESCAPE_CHARACTER)
options.getOptional(ESCAPE_CHARACTER).map(s -> s.charAt(0))
.ifPresent(csvBuilder::setEscapeChar);

properties.getOptionalString(FORMAT_NULL_LITERAL)
options.getOptional(NULL_LITERAL)
.ifPresent(csvBuilder::setNullValue);

properties.getOptionalString(FORMAT_LINE_DELIMITER)
options.getOptional(LINE_DELIMITER)
.ifPresent(csvBuilder::setLineSeparator);

return csvBuilder.build();
}

@Override
public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
ReadableConfig options = context.getFormatOptions();
validateFormatOptions(options);

CsvRowDataSerializationSchema.Builder builder = new CsvRowDataSerializationSchema.Builder(
context.getFormatRowType());

DescriptorProperties properties = getValidatedProperties(context.getFormatProperties());

properties.getOptionalCharacter(FORMAT_FIELD_DELIMITER)
options.getOptional(FIELD_DELIMITER).map(s -> s.charAt(0))
.ifPresent(builder::setFieldDelimiter);

properties.getOptionalString(FORMAT_LINE_DELIMITER)
options.getOptional(LINE_DELIMITER)
.ifPresent(builder::setLineDelimiter);

if (properties.getOptionalBoolean(FORMAT_DISABLE_QUOTE_CHARACTER).orElse(false)) {
if (options.get(DISABLE_QUOTE_CHARACTER)) {
builder.disableQuoteCharacter();
} else {
properties.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(builder::setQuoteCharacter);
options.getOptional(QUOTE_CHARACTER).map(s -> s.charAt(0)).ifPresent(builder::setQuoteCharacter);
}

properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER)
options.getOptional(ARRAY_ELEMENT_DELIMITER)
.ifPresent(builder::setArrayElementDelimiter);

properties.getOptionalCharacter(FORMAT_ESCAPE_CHARACTER)
options.getOptional(ESCAPE_CHARACTER).map(s -> s.charAt(0))
.ifPresent(builder::setEscapeCharacter);

properties.getOptionalString(FORMAT_NULL_LITERAL)
options.getOptional(NULL_LITERAL)
.ifPresent(builder::setNullLiteral);

final CsvRowDataSerializationSchema serializationSchema = builder.build();
Expand All @@ -176,49 +204,6 @@ public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterConte
return Optional.empty();
}

@Override
public boolean supportsSchemaDerivation() {
return true;
}

@Override
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add(FORMAT_FIELD_DELIMITER);
properties.add(FORMAT_LINE_DELIMITER);
properties.add(FORMAT_DISABLE_QUOTE_CHARACTER);
properties.add(FORMAT_QUOTE_CHARACTER);
properties.add(FORMAT_ALLOW_COMMENTS);
properties.add(FORMAT_IGNORE_PARSE_ERRORS);
properties.add(FORMAT_ARRAY_ELEMENT_DELIMITER);
properties.add(FORMAT_ESCAPE_CHARACTER);
properties.add(FORMAT_NULL_LITERAL);
return properties;
}

@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(FORMAT, "csv");
return context;
}

private static DescriptorProperties getValidatedProperties(Map<String, String> propertiesMap) {
final DescriptorProperties properties = new DescriptorProperties(true);
properties.putProperties(propertiesMap);

properties.validateString(FORMAT_FIELD_DELIMITER, true, 1, 1);
properties.validateEnumValues(FORMAT_LINE_DELIMITER, true, Arrays.asList("\r", "\n", "\r\n", ""));
properties.validateBoolean(FORMAT_DISABLE_QUOTE_CHARACTER, true);
properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1);
properties.validateBoolean(FORMAT_ALLOW_COMMENTS, true);
properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true);
properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1);
properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1);

return properties;
}

/**
* InputFormat that reads csv record into {@link RowData}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Set<ConfigOption<?>> optionalOptions() {
// Validation
// ------------------------------------------------------------------------

private static void validateFormatOptions(ReadableConfig tableOptions) {
static void validateFormatOptions(ReadableConfig tableOptions) {
final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
if (isDisabledQuoteCharacter && hasQuoteCharacter){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.formats.csv.CsvFormatFactory
org.apache.flink.formats.csv.CsvFileSystemFormatFactory
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@
# limitations under the License.

org.apache.flink.formats.csv.CsvRowFormatFactory
org.apache.flink.formats.csv.CsvFileSystemFormatFactory
Loading

0 comments on commit 92d6742

Please sign in to comment.