From 92d674245c72a64efca83eb4343f37422ccf9e6f Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 19 May 2020 18:23:17 +0800 Subject: [PATCH] [FLINK-17626][fs-connector] Fs connector should use FLIP-122 format options style This closes #12212 --- .../flink/connectors/hive/HiveTableSink.java | 6 +- .../avro/AvroFileSystemFormatFactory.java | 40 +++-- .../org.apache.flink.table.factories.Factory | 1 + ....apache.flink.table.factories.TableFactory | 1 - .../formats/avro/AvroFilesystemITCase.java | 2 +- .../csv/CsvFileSystemFormatFactory.java | 141 ++++++++---------- .../flink/formats/csv/CsvFormatFactory.java | 2 +- .../org.apache.flink.table.factories.Factory | 1 + ....apache.flink.table.factories.TableFactory | 1 - .../formats/csv/CsvFilesystemBatchITCase.java | 8 +- .../json/JsonFileSystemFormatFactory.java | 57 ++++--- .../flink/formats/json/JsonFormatFactory.java | 23 +-- .../flink/formats/json/JsonOptions.java | 41 +++++ .../org.apache.flink.table.factories.Factory | 1 + ....apache.flink.table.factories.TableFactory | 1 - .../json/JsonBatchFileSystemITCase.java | 2 +- .../flink/orc/OrcFileSystemFormatFactory.java | 65 +++----- ... org.apache.flink.table.factories.Factory} | 0 .../apache/flink/orc/OrcFileSystemITCase.java | 33 +++- .../ParquetFileSystemFormatFactory.java | 83 ++++------- ... org.apache.flink.table.factories.Factory} | 0 .../parquet/ParquetFileSystemITCase.java | 41 ++++- .../factories/FileSystemFormatFactory.java | 12 +- .../batch/BatchExecLegacySinkRule.scala | 4 +- .../physical/batch/BatchExecSinkRule.scala | 4 +- .../stream/StreamExecLegacySinkRule.scala | 4 +- .../physical/stream/StreamExecSinkRule.scala | 4 +- .../utils/TestCsvFileSystemFormatFactory.java | 31 ++-- .../org.apache.flink.table.factories.Factory | 1 + ....apache.flink.table.factories.TableFactory | 1 - .../batch/sql/FileSystemTestCsvITCase.scala | 4 +- .../batch/sql/PartitionableSinkITCase.scala | 6 +- .../sql/FsStreamingSinkTestCsvITCase.scala | 5 +- .../sql/StreamFileSystemTestCsvITCase.scala | 4 +- .../table/filesystem/FileSystemOptions.java | 29 ++++ .../filesystem/FileSystemTableFactory.java | 124 ++++----------- .../table/filesystem/FileSystemTableSink.java | 15 +- .../filesystem/FileSystemTableSource.java | 33 ++-- 38 files changed, 410 insertions(+), 421 deletions(-) create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java rename flink-formats/flink-orc/src/main/resources/META-INF/services/{org.apache.flink.table.factories.TableFactory => org.apache.flink.table.factories.Factory} (100%) rename flink-formats/flink-parquet/src/main/resources/META-INF/services/{org.apache.flink.table.factories.TableFactory => org.apache.flink.table.factories.Factory} (100%) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 51f2ec9f3d337..aa83d7ac00da2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -79,8 +79,8 @@ import java.util.Map; import java.util.Optional; -import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_FILE_SIZE; -import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_TIME_INTERVAL; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL; /** * Table sink to write to Hive tables. @@ -184,7 +184,7 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) { TableRollingPolicy rollingPolicy = new TableRollingPolicy( true, conf.get(SINK_ROLLING_POLICY_FILE_SIZE), - conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL)); + conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis()); InactiveBucketListener listener = new InactiveBucketListener(); Optional> bulkFactory = createBulkWriterFactory(partitionColumns, sd); diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java index 6ff6ee56c6176..a033739f9ad2e 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileSystemFormatFactory.java @@ -21,13 +21,14 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -35,7 +36,6 @@ import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -44,47 +44,43 @@ import org.apache.avro.io.DatumWriter; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; - /** * Avro format factory for file system. */ public class AvroFileSystemFormatFactory implements FileSystemFormatFactory { - public static final String AVRO_OUTPUT_CODEC = "format." + DataFileConstants.CODEC; + public static final String IDENTIFIER = "avro"; + public static final ConfigOption AVRO_OUTPUT_CODEC = ConfigOptions.key("codec") + .stringType() + .noDefaultValue() + .withDescription("The compression codec for avro"); @Override - public boolean supportsSchemaDerivation() { - return true; + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public List supportedProperties() { - List options = new ArrayList<>(); - options.add(AVRO_OUTPUT_CODEC); - return options; + public Set> requiredOptions() { + return new HashSet<>(); } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "avro"); - return context; + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(AVRO_OUTPUT_CODEC); + return options; } @Override public InputFormat createReader(ReaderContext context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getFormatProperties()); - String[] fieldNames = context.getSchema().getFieldNames(); List projectFields = Arrays.stream(context.getProjectFields()) .mapToObj(idx -> fieldNames[idx]) @@ -123,7 +119,7 @@ public Optional> createEncoder(WriterContext context) { public Optional> createBulkWriterFactory(WriterContext context) { return Optional.of(new RowDataAvroWriterFactory( context.getFormatRowType(), - context.getFormatProperties().get(AVRO_OUTPUT_CODEC))); + context.getFormatOptions().get(AVRO_OUTPUT_CODEC))); } /** diff --git a/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index e456efc2cff47..b8773629a456a 100644 --- a/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.formats.avro.AvroFormatFactory +org.apache.flink.formats.avro.AvroFileSystemFormatFactory diff --git a/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index c9cb362f5ee8b..e24424a8dc6bd 100644 --- a/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -14,4 +14,3 @@ # limitations under the License. org.apache.flink.formats.avro.AvroRowFormatFactory -org.apache.flink.formats.avro.AvroFileSystemFormatFactory diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java index 5a1e46834041f..157be5a5d5aa2 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java @@ -50,7 +50,7 @@ public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='avro'"); if (configure) { - ret.add("'format.avro.codec'='snappy'"); + ret.add("'avro.codec'='snappy'"); } return ret.toArray(new String[0]); } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java index 1053c78216fb5..19efacf8ac4d7 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileSystemFormatFactory.java @@ -22,12 +22,13 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.DeserializationRuntimeConverter; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -41,34 +42,61 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ALLOW_COMMENTS; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ARRAY_ELEMENT_DELIMITER; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_DISABLE_QUOTE_CHARACTER; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_ESCAPE_CHARACTER; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_FIELD_DELIMITER; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_IGNORE_PARSE_ERRORS; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_LINE_DELIMITER; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_NULL_LITERAL; -import static org.apache.flink.table.descriptors.CsvValidator.FORMAT_QUOTE_CHARACTER; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; +import static org.apache.flink.formats.csv.CsvFormatFactory.validateFormatOptions; +import static org.apache.flink.formats.csv.CsvOptions.ALLOW_COMMENTS; +import static org.apache.flink.formats.csv.CsvOptions.ARRAY_ELEMENT_DELIMITER; +import static org.apache.flink.formats.csv.CsvOptions.DISABLE_QUOTE_CHARACTER; +import static org.apache.flink.formats.csv.CsvOptions.ESCAPE_CHARACTER; +import static org.apache.flink.formats.csv.CsvOptions.FIELD_DELIMITER; +import static org.apache.flink.formats.csv.CsvOptions.IGNORE_PARSE_ERRORS; +import static org.apache.flink.formats.csv.CsvOptions.LINE_DELIMITER; +import static org.apache.flink.formats.csv.CsvOptions.NULL_LITERAL; +import static org.apache.flink.formats.csv.CsvOptions.QUOTE_CHARACTER; /** * CSV format factory for file system. */ public class CsvFileSystemFormatFactory implements FileSystemFormatFactory { + public static final String IDENTIFIER = "csv"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(FIELD_DELIMITER); + options.add(LINE_DELIMITER); + options.add(DISABLE_QUOTE_CHARACTER); + options.add(QUOTE_CHARACTER); + options.add(ALLOW_COMMENTS); + options.add(IGNORE_PARSE_ERRORS); + options.add(ARRAY_ELEMENT_DELIMITER); + options.add(ESCAPE_CHARACTER); + options.add(NULL_LITERAL); + return options; + } + @Override public InputFormat createReader(ReaderContext context) { - DescriptorProperties properties = getValidatedProperties(context.getFormatProperties()); + ReadableConfig options = context.getFormatOptions(); + validateFormatOptions(options); RowType formatRowType = context.getFormatRowType(); @@ -87,9 +115,9 @@ public class CsvFileSystemFormatFactory implements FileSystemFormatFactory { .mapToInt(csvFields::indexOf) .toArray(); - CsvSchema csvSchema = buildCsvSchema(formatRowType, properties); + CsvSchema csvSchema = buildCsvSchema(formatRowType, options); - boolean ignoreParseErrors = properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false); + boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS); return new CsvInputFormat( context.getPaths(), @@ -106,32 +134,32 @@ public class CsvFileSystemFormatFactory implements FileSystemFormatFactory { ignoreParseErrors); } - private CsvSchema buildCsvSchema(RowType rowType, DescriptorProperties properties) { + private CsvSchema buildCsvSchema(RowType rowType, ReadableConfig options) { CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType); CsvSchema.Builder csvBuilder = csvSchema.rebuild(); //format properties - properties.getOptionalCharacter(FORMAT_FIELD_DELIMITER) + options.getOptional(FIELD_DELIMITER).map(s -> s.charAt(0)) .ifPresent(csvBuilder::setColumnSeparator); - properties.getOptionalCharacter(FORMAT_QUOTE_CHARACTER) + options.getOptional(QUOTE_CHARACTER).map(s -> s.charAt(0)) .ifPresent(csvBuilder::setQuoteChar); - properties.getOptionalBoolean(FORMAT_ALLOW_COMMENTS) + options.getOptional(ALLOW_COMMENTS) .ifPresent(csvBuilder::setAllowComments); - properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER) + options.getOptional(ARRAY_ELEMENT_DELIMITER) .ifPresent(csvBuilder::setArrayElementSeparator); - properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER) + options.getOptional(ARRAY_ELEMENT_DELIMITER) .ifPresent(csvBuilder::setArrayElementSeparator); - properties.getOptionalCharacter(FORMAT_ESCAPE_CHARACTER) + options.getOptional(ESCAPE_CHARACTER).map(s -> s.charAt(0)) .ifPresent(csvBuilder::setEscapeChar); - properties.getOptionalString(FORMAT_NULL_LITERAL) + options.getOptional(NULL_LITERAL) .ifPresent(csvBuilder::setNullValue); - properties.getOptionalString(FORMAT_LINE_DELIMITER) + options.getOptional(LINE_DELIMITER) .ifPresent(csvBuilder::setLineSeparator); return csvBuilder.build(); @@ -139,31 +167,31 @@ private CsvSchema buildCsvSchema(RowType rowType, DescriptorProperties propertie @Override public Optional> createEncoder(WriterContext context) { + ReadableConfig options = context.getFormatOptions(); + validateFormatOptions(options); CsvRowDataSerializationSchema.Builder builder = new CsvRowDataSerializationSchema.Builder( context.getFormatRowType()); - DescriptorProperties properties = getValidatedProperties(context.getFormatProperties()); - - properties.getOptionalCharacter(FORMAT_FIELD_DELIMITER) + options.getOptional(FIELD_DELIMITER).map(s -> s.charAt(0)) .ifPresent(builder::setFieldDelimiter); - properties.getOptionalString(FORMAT_LINE_DELIMITER) + options.getOptional(LINE_DELIMITER) .ifPresent(builder::setLineDelimiter); - if (properties.getOptionalBoolean(FORMAT_DISABLE_QUOTE_CHARACTER).orElse(false)) { + if (options.get(DISABLE_QUOTE_CHARACTER)) { builder.disableQuoteCharacter(); } else { - properties.getOptionalCharacter(FORMAT_QUOTE_CHARACTER).ifPresent(builder::setQuoteCharacter); + options.getOptional(QUOTE_CHARACTER).map(s -> s.charAt(0)).ifPresent(builder::setQuoteCharacter); } - properties.getOptionalString(FORMAT_ARRAY_ELEMENT_DELIMITER) + options.getOptional(ARRAY_ELEMENT_DELIMITER) .ifPresent(builder::setArrayElementDelimiter); - properties.getOptionalCharacter(FORMAT_ESCAPE_CHARACTER) + options.getOptional(ESCAPE_CHARACTER).map(s -> s.charAt(0)) .ifPresent(builder::setEscapeCharacter); - properties.getOptionalString(FORMAT_NULL_LITERAL) + options.getOptional(NULL_LITERAL) .ifPresent(builder::setNullLiteral); final CsvRowDataSerializationSchema serializationSchema = builder.build(); @@ -176,49 +204,6 @@ public Optional> createBulkWriterFactory(WriterConte return Optional.empty(); } - @Override - public boolean supportsSchemaDerivation() { - return true; - } - - @Override - public List supportedProperties() { - final List properties = new ArrayList<>(); - properties.add(FORMAT_FIELD_DELIMITER); - properties.add(FORMAT_LINE_DELIMITER); - properties.add(FORMAT_DISABLE_QUOTE_CHARACTER); - properties.add(FORMAT_QUOTE_CHARACTER); - properties.add(FORMAT_ALLOW_COMMENTS); - properties.add(FORMAT_IGNORE_PARSE_ERRORS); - properties.add(FORMAT_ARRAY_ELEMENT_DELIMITER); - properties.add(FORMAT_ESCAPE_CHARACTER); - properties.add(FORMAT_NULL_LITERAL); - return properties; - } - - @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "csv"); - return context; - } - - private static DescriptorProperties getValidatedProperties(Map propertiesMap) { - final DescriptorProperties properties = new DescriptorProperties(true); - properties.putProperties(propertiesMap); - - properties.validateString(FORMAT_FIELD_DELIMITER, true, 1, 1); - properties.validateEnumValues(FORMAT_LINE_DELIMITER, true, Arrays.asList("\r", "\n", "\r\n", "")); - properties.validateBoolean(FORMAT_DISABLE_QUOTE_CHARACTER, true); - properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1); - properties.validateBoolean(FORMAT_ALLOW_COMMENTS, true); - properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true); - properties.validateString(FORMAT_ARRAY_ELEMENT_DELIMITER, true, 1); - properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1); - - return properties; - } - /** * InputFormat that reads csv record into {@link RowData}. */ diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java index 99e4ac44f9454..0ebf0b9af9708 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java @@ -147,7 +147,7 @@ public Set> optionalOptions() { // Validation // ------------------------------------------------------------------------ - private static void validateFormatOptions(ReadableConfig tableOptions) { + static void validateFormatOptions(ReadableConfig tableOptions) { final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent(); final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER); if (isDisabledQuoteCharacter && hasQuoteCharacter){ diff --git a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 72657bfe85c8e..fdd66f47f9448 100644 --- a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.flink.formats.csv.CsvFormatFactory +org.apache.flink.formats.csv.CsvFileSystemFormatFactory diff --git a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 0092d2e5b4e59..61cd834b24ea4 100644 --- a/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-formats/flink-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -14,4 +14,3 @@ # limitations under the License. org.apache.flink.formats.csv.CsvRowFormatFactory -org.apache.flink.formats.csv.CsvFileSystemFormatFactory diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java index 671ba1292f618..cd5aab18d76e5 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java @@ -47,8 +47,8 @@ public static class GeneralCsvFilesystemBatchITCase extends BatchFileSystemITCas public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='csv'"); - ret.add("'format.field-delimiter'=';'"); - ret.add("'format.quote-character'='#'"); + ret.add("'csv.field-delimiter'=';'"); + ret.add("'csv.quote-character'='#'"); return ret.toArray(new String[0]); } } @@ -62,8 +62,8 @@ public static class EnrichedCsvFilesystemBatchITCase extends BatchFileSystemITCa public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='csv'"); - ret.add("'format.ignore-parse-errors'='true'"); - ret.add("'format.escape-character'='\t'"); + ret.add("'csv.ignore-parse-errors'='true'"); + ret.add("'csv.escape-character'='\t'"); return ret.toArray(new String[0]); } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java index 9a2be31407b1b..d464634e4fe36 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java @@ -23,11 +23,12 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -36,43 +37,48 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; -import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD; -import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_IGNORE_PARSE_ERRORS; +import static org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions; +import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD; +import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS; /** * Factory to build reader/writer to read/write json format file. */ public class JsonFileSystemFormatFactory implements FileSystemFormatFactory { + public static final String IDENTIFIER = "json"; + @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "json"); - return context; + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public List supportedProperties() { - ArrayList properties = new ArrayList<>(); - properties.add(FORMAT_FAIL_ON_MISSING_FIELD); - properties.add(FORMAT_IGNORE_PARSE_ERRORS); - return properties; + public Set> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(FAIL_ON_MISSING_FIELD); + options.add(IGNORE_PARSE_ERRORS); + return options; } @Override public InputFormat createReader(ReaderContext context) { - DescriptorProperties properties = getValidatedProperties(context.getFormatProperties()); - boolean failOnMissingField = properties.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).orElse(false); - boolean ignoreParseErrors = properties.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS).orElse(false); + ReadableConfig options = context.getFormatOptions(); + validateFormatOptions(options); + boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD); + boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS); RowType formatRowType = context.getFormatRowType(); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( @@ -119,19 +125,6 @@ public Optional> createBulkWriterFactory(WriterConte return Optional.empty(); } - @Override - public boolean supportsSchemaDerivation() { - return true; - } - - private static DescriptorProperties getValidatedProperties(Map propertiesMap) { - final DescriptorProperties properties = new DescriptorProperties(true); - properties.putProperties(propertiesMap); - properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true); - properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, true); - return properties; - } - /** * A {@link JsonInputFormat} is responsible to read {@link RowData} records * from json format files. diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 07e6d2d65c435..ca801591149d9 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.ChangelogMode; @@ -42,6 +41,9 @@ import java.util.HashSet; import java.util.Set; +import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD; +import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS; + /** * Table format factory for providing configured instances of JSON to RowData * {@link SerializationSchema} and {@link DeserializationSchema}. @@ -52,23 +54,6 @@ public class JsonFormatFactory implements public static final String IDENTIFIER = "json"; - // ------------------------------------------------------------------------ - // Options - // ------------------------------------------------------------------------ - - private static final ConfigOption FAIL_ON_MISSING_FIELD = ConfigOptions - .key("fail-on-missing-field") - .booleanType() - .defaultValue(false) - .withDescription("Optional flag to specify whether to fail if a field is missing or not, false by default"); - - private static final ConfigOption IGNORE_PARSE_ERRORS = ConfigOptions - .key("ignore-parse-errors") - .booleanType() - .defaultValue(false) - .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n" - + "fields are set to null in case of errors, false by default"); - @SuppressWarnings("unchecked") @Override public ScanFormat> createScanFormat( @@ -146,7 +131,7 @@ public Set> optionalOptions() { // Validation // ------------------------------------------------------------------------ - private void validateFormatOptions(ReadableConfig tableOptions) { + static void validateFormatOptions(ReadableConfig tableOptions) { boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD); boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS); if (ignoreParseErrors && failOnMissingField) { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java new file mode 100644 index 0000000000000..dca8c16c5424a --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * This class holds configuration constants used by json format. + */ +public class JsonOptions { + + public static final ConfigOption FAIL_ON_MISSING_FIELD = ConfigOptions + .key("fail-on-missing-field") + .booleanType() + .defaultValue(false) + .withDescription("Optional flag to specify whether to fail if a field is missing or not, false by default"); + + public static final ConfigOption IGNORE_PARSE_ERRORS = ConfigOptions + .key("ignore-parse-errors") + .booleanType() + .defaultValue(false) + .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n" + + "fields are set to null in case of errors, false by default"); +} diff --git a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 5349ab10c035b..781a963f7da29 100644 --- a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +org.apache.flink.formats.json.JsonFileSystemFormatFactory org.apache.flink.formats.json.JsonFormatFactory org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory org.apache.flink.formats.json.canal.CanalJsonFormatFactory diff --git a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index bc622f2bc0817..aec584688eb02 100644 --- a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -14,4 +14,3 @@ # limitations under the License. org.apache.flink.formats.json.JsonRowFormatFactory -org.apache.flink.formats.json.JsonFileSystemFormatFactory diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java index 2c01d29e10987..f1ee52b333d58 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java @@ -39,7 +39,7 @@ public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase { public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='json'"); - ret.add("'format.ignore-parse-errors'='true'"); + ret.add("'json.ignore-parse-errors'='true'"); return ret.toArray(new String[0]); } diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java index 9ab0ad58cd978..bc337c049408d 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileSystemFormatFactory.java @@ -22,12 +22,13 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.orc.vector.RowDataVectorizer; import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -41,16 +42,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; /** @@ -58,44 +57,34 @@ */ public class OrcFileSystemFormatFactory implements FileSystemFormatFactory { - /** - * Prefix for orc-related properties, besides format, start with "orc". - * See more in {@link org.apache.orc.OrcConf}. - */ - public static final String ORC_PROPERTIES_PREFIX = "format.orc"; + public static final String IDENTIFIER = "orc"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "orc"); - return context; + public Set> requiredOptions() { + return new HashSet<>(); } @Override - public List supportedProperties() { - return Collections.singletonList( - ORC_PROPERTIES_PREFIX + ".*" - ); + public Set> optionalOptions() { + // support "orc.*" + return new HashSet<>(); } - private static Properties getOrcProperties(DescriptorProperties properties) { - Properties conf = new Properties(); - properties.asMap().keySet() - .stream() - .filter(key -> key.startsWith(ORC_PROPERTIES_PREFIX)) - .forEach(key -> { - String value = properties.getString(key); - String subKey = key.substring((FORMAT + '.').length()); - conf.put(subKey, value); - }); - return conf; + private static Properties getOrcProperties(ReadableConfig options) { + Properties orcProperties = new Properties(); + Properties properties = new Properties(); + ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties); + properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v)); + return orcProperties; } @Override public InputFormat createReader(ReaderContext context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getFormatProperties()); - return new OrcRowDataInputFormat( context.getPaths(), context.getSchema().getFieldNames(), @@ -103,14 +92,11 @@ private static Properties getOrcProperties(DescriptorProperties properties) { context.getProjectFields(), context.getDefaultPartName(), context.getPushedDownLimit(), - getOrcProperties(properties)); + getOrcProperties(context.getFormatOptions())); } @Override public Optional> createBulkWriterFactory(WriterContext context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getFormatProperties()); - LogicalType[] orcTypes = Arrays.stream(context.getFormatFieldTypes()) .map(DataType::getLogicalType) .toArray(LogicalType[]::new); @@ -120,7 +106,7 @@ public Optional> createBulkWriterFactory(WriterConte OrcBulkWriterFactory factory = new OrcBulkWriterFactory<>( new RowDataVectorizer(typeDescription.toString(), orcTypes), - getOrcProperties(properties), + getOrcProperties(context.getFormatOptions()), new Configuration()); return Optional.of(factory); } @@ -130,11 +116,6 @@ public Optional> createEncoder(WriterContext context) { return Optional.empty(); } - @Override - public boolean supportsSchemaDerivation() { - return true; - } - /** * An implementation of {@link FileInputFormat} to read {@link RowData} records * from orc files. diff --git a/flink-formats/flink-orc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-orc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-formats/flink-orc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory rename to flink-formats/flink-orc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java index 29437b1012375..699c656762753 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemITCase.java @@ -20,9 +20,17 @@ import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -50,8 +58,31 @@ public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='orc'"); if (configure) { - ret.add("'format.orc.compress'='snappy'"); + ret.add("'orc.compress'='snappy'"); } return ret.toArray(new String[0]); } + + @Override + public void testNonPartition() { + super.testNonPartition(); + + // test configure success + File directory = new File(URI.create(resultPath()).getPath()); + File[] files = directory.listFiles((dir, name) -> + !name.startsWith(".") && !name.startsWith("_")); + Assert.assertNotNull(files); + Path path = new Path(URI.create(files[0].getAbsolutePath())); + + try { + Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(new Configuration())); + if (configure) { + Assert.assertEquals("SNAPPY", reader.getCompressionKind().toString()); + } else { + Assert.assertEquals("ZLIB", reader.getCompressionKind().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java index 95791bb5189c8..a614164ecda0f 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder; @@ -30,7 +31,6 @@ import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader; import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -38,19 +38,18 @@ import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.ParquetOutputFormat; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.Properties; +import java.util.Set; import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; /** @@ -58,66 +57,43 @@ */ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { - public static final ConfigOption UTC_TIMEZONE = key("format.utc-timezone") + public static final String IDENTIFIER = "parquet"; + + public static final ConfigOption UTC_TIMEZONE = key("utc-timezone") .booleanType() .defaultValue(false) .withDescription("Use UTC timezone or local timezone to the conversion between epoch" + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + " use UTC timezone"); - /** - * Prefix for parquet-related properties, besides format, start with "parquet". - * See more in {@link ParquetOutputFormat}. - * - parquet.compression - * - parquet.block.size - * - parquet.page.size - * - parquet.dictionary.page.size - * - parquet.writer.max-padding - * - parquet.enable.dictionary - * - parquet.validation - * - parquet.writer.version - * ... - */ - public static final String PARQUET_PROPERTIES = "format.parquet"; - @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "parquet"); - return context; + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public List supportedProperties() { - return Arrays.asList( - UTC_TIMEZONE.key(), - PARQUET_PROPERTIES + ".*" - ); + public Set> requiredOptions() { + return new HashSet<>(); } - private static boolean isUtcTimestamp(DescriptorProperties properties) { - return properties.getOptionalBoolean(UTC_TIMEZONE.key()) - .orElse(UTC_TIMEZONE.defaultValue()); + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(UTC_TIMEZONE); + // support "parquet.*" + return options; } - private static Configuration getParquetConfiguration(DescriptorProperties properties) { + private static Configuration getParquetConfiguration(ReadableConfig options) { Configuration conf = new Configuration(); - properties.asMap().keySet() - .stream() - .filter(key -> key.startsWith(PARQUET_PROPERTIES)) - .forEach(key -> { - String value = properties.getString(key); - String subKey = key.substring((FORMAT + '.').length()); - conf.set(subKey, value); - }); + Properties properties = new Properties(); + ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties); + properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString())); return conf; } @Override public InputFormat createReader(ReaderContext context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getFormatProperties()); - return new ParquetInputFormat( context.getPaths(), context.getSchema().getFieldNames(), @@ -125,23 +101,19 @@ private static Configuration getParquetConfiguration(DescriptorProperties proper context.getProjectFields(), context.getDefaultPartName(), context.getPushedDownLimit(), - getParquetConfiguration(properties), - isUtcTimestamp(properties)); + getParquetConfiguration(context.getFormatOptions()), + context.getFormatOptions().get(UTC_TIMEZONE)); } @Override public Optional> createBulkWriterFactory(WriterContext context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getFormatProperties()); - return Optional.of(ParquetRowDataBuilder.createWriterFactory( RowType.of(Arrays.stream(context.getFormatFieldTypes()) .map(DataType::getLogicalType) .toArray(LogicalType[]::new), context.getFormatFieldNames()), - getParquetConfiguration(properties), - isUtcTimestamp(properties) - )); + getParquetConfiguration(context.getFormatOptions()), + context.getFormatOptions().get(UTC_TIMEZONE))); } @Override @@ -149,11 +121,6 @@ public Optional> createEncoder(WriterContext context) { return Optional.empty(); } - @Override - public boolean supportsSchemaDerivation() { - return true; - } - /** * An implementation of {@link ParquetInputFormat} to read {@link RowData} records * from Parquet files. diff --git a/flink-formats/flink-parquet/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-formats/flink-parquet/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-formats/flink-parquet/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory rename to flink-formats/flink-parquet/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java index b6994b6c71572..1f9e92d7bafdd 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java @@ -20,14 +20,24 @@ import org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; + /** * ITCase for {@link ParquetFileSystemFormatFactory}. */ @@ -50,9 +60,36 @@ public String[] formatProperties() { List ret = new ArrayList<>(); ret.add("'format'='parquet'"); if (configure) { - ret.add("'format.utc-timezone'='true'"); - ret.add("'format.parquet.compression'='gzip'"); + ret.add("'parquet.utc-timezone'='true'"); + ret.add("'parquet.compression'='gzip'"); } return ret.toArray(new String[0]); } + + @Override + public void testNonPartition() { + super.testNonPartition(); + + // test configure success + File directory = new File(URI.create(resultPath()).getPath()); + File[] files = directory.listFiles((dir, name) -> + !name.startsWith(".") && !name.startsWith("_")); + Assert.assertNotNull(files); + Path path = new Path(URI.create(files[0].getAbsolutePath())); + + try { + ParquetMetadata footer = readFooter(new Configuration(), path, range(0, Long.MAX_VALUE)); + if (configure) { + Assert.assertEquals( + "GZIP", + footer.getBlocks().get(0).getColumns().get(0).getCodec().toString()); + } else { + Assert.assertEquals( + "UNCOMPRESSED", + footer.getBlocks().get(0).getColumns().get(0).getCodec().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java index 1f660f82a13b7..c4fff55951825 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FileSystemFormatFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; @@ -32,7 +33,6 @@ import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -40,7 +40,7 @@ * File system format factory for creating configured instances of reader and writer. */ @Internal -public interface FileSystemFormatFactory extends TableFormatFactory { +public interface FileSystemFormatFactory extends Factory { /** * Create {@link InputFormat} reader. @@ -68,9 +68,9 @@ interface ReaderContext { TableSchema getSchema(); /** - * Properties of this format. + * Options of this format. */ - Map getFormatProperties(); + ReadableConfig getFormatOptions(); /** * Partition keys of the table. @@ -160,9 +160,9 @@ interface WriterContext { TableSchema getSchema(); /** - * Properties of this format. + * Options of this format. */ - Map getFormatProperties(); + ReadableConfig getFormatOptions(); /** * Partition keys of the table. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala index 0339e41d041f5..e4e84cfa4cbc4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecLegacySinkRule.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.api.TableException -import org.apache.flink.table.filesystem.FileSystemTableFactory +import org.apache.flink.table.filesystem.FileSystemOptions import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacySink @@ -57,7 +57,7 @@ class BatchExecLegacySinkRule extends ConverterRule( val shuffleEnable = sinkNode .catalogTable .getProperties - .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key()) + .get(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION.key()) if (shuffleEnable != null && shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala index 0464430263330..2d42930c8501f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecSinkRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.api.TableException import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning -import org.apache.flink.table.filesystem.FileSystemTableFactory +import org.apache.flink.table.filesystem.FileSystemOptions import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink @@ -63,7 +63,7 @@ class BatchExecSinkRule extends ConverterRule( val shuffleEnable = sinkNode .catalogTable .getOptions - .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key()) + .get(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION.key()) if (shuffleEnable != null && shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala index f4e23b347b995..d47a905e678a7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecLegacySinkRule.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.api.TableException -import org.apache.flink.table.filesystem.FileSystemTableFactory +import org.apache.flink.table.filesystem.FileSystemOptions import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacySink @@ -56,7 +56,7 @@ class StreamExecLegacySinkRule extends ConverterRule( val shuffleEnable = sinkNode .catalogTable .getProperties - .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key()) + .get(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION.key()) if (shuffleEnable != null && shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala index 16140235d3cb2..a92265e82482a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.api.TableException import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning -import org.apache.flink.table.filesystem.FileSystemTableFactory +import org.apache.flink.table.filesystem.FileSystemOptions import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSink @@ -62,7 +62,7 @@ class StreamExecSinkRule extends ConverterRule( val shuffleEnable = sinkNode .catalogTable .getOptions - .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key()) + .get(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION.key()) if (shuffleEnable != null && shuffleEnable.toBoolean) { requiredTraitSet = requiredTraitSet.plus( diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java index 64949e14c5031..d2283648f9f9c 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/TestCsvFileSystemFormatFactory.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.factories.FileSystemFormatFactory; @@ -34,38 +36,37 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER; import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; /** * Test csv {@link FileSystemFormatFactory}. */ public class TestCsvFileSystemFormatFactory implements FileSystemFormatFactory { - public static final String USE_BULK_WRITER = "format.use-bulk-writer"; + public static final ConfigOption USE_BULK_WRITER = ConfigOptions.key("use-bulk-writer") + .booleanType() + .defaultValue(false); @Override - public boolean supportsSchemaDerivation() { - return true; + public String factoryIdentifier() { + return "testcsv"; } @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put(FORMAT, "testcsv"); - return context; + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(USE_BULK_WRITER); + return options; } @Override - public List supportedProperties() { - return Collections.singletonList(USE_BULK_WRITER); + public Set> optionalOptions() { + return new HashSet<>(); } @Override @@ -80,7 +81,7 @@ public List supportedProperties() { } private boolean useBulkWriter(WriterContext context) { - return Boolean.parseBoolean(context.getFormatProperties().get(USE_BULK_WRITER)); + return context.getFormatOptions().get(USE_BULK_WRITER); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index dfb6862405bb7..7632f4b906e4f 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -15,3 +15,4 @@ org.apache.flink.table.planner.factories.TestValuesTableFactory org.apache.flink.table.planner.factories.TestProjectableValuesTableFactory +org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index f7d99b19743ec..2e712bfc86355 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -18,7 +18,6 @@ org.apache.flink.table.planner.runtime.batch.sql.TestPartitionableSinkFactory org.apache.flink.table.planner.utils.TestPartitionableSourceFactory org.apache.flink.table.planner.utils.TestFilterableTableSourceFactory org.apache.flink.table.planner.utils.TestLegacyProjectableTableSourceFactory -org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory org.apache.flink.table.planner.utils.TestOptionsTableFactory org.apache.flink.table.planner.utils.WithoutTimeAttributesTableSourceFactory diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala index cdf0a8c951c41..9da1e0de33319 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/FileSystemTestCsvITCase.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.planner.runtime.batch.sql -import org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory.USE_BULK_WRITER - import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -32,7 +30,7 @@ class FileSystemTestCsvITCase(useBulkWriter: Boolean) extends BatchFileSystemITC override def formatProperties(): Array[String] = { super.formatProperties() ++ Seq( "'format' = 'testcsv'", - s"'$USE_BULK_WRITER' = '$useBulkWriter'") + s"'testcsv.use-bulk-writer' = '$useBulkWriter'") } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala index a27385b71226f..1350724582b14 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala @@ -31,9 +31,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR import org.apache.flink.table.descriptors.DescriptorProperties import org.apache.flink.table.descriptors.Schema.SCHEMA import org.apache.flink.table.factories.TableSinkFactory -import org.apache.flink.table.filesystem.FileSystemTableFactory +import org.apache.flink.table.filesystem.FileSystemOptions import org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._ -import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TableEnvUtil} +import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink} @@ -259,7 +259,7 @@ object PartitionableSinkITCase { partitionColumns: Array[String]): Unit = { val properties = new DescriptorProperties() properties.putString("supports-grouping", grouping.toString) - properties.putString(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key(), "true") + properties.putString(FileSystemOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true") properties.putString(CONNECTOR_TYPE, "TestPartitionableSink") partitionColumns.zipWithIndex.foreach { case (part, i) => properties.putString("partition-column." + i, part) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FsStreamingSinkTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FsStreamingSinkTestCsvITCase.scala index 19a632c49e465..62d0f95ef8444 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FsStreamingSinkTestCsvITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FsStreamingSinkTestCsvITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase -import org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory.USE_BULK_WRITER import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -34,7 +33,9 @@ class FsStreamingSinkTestCsvITCase(useBulkWriter: Boolean) extends FsStreamingSi override def additionalProperties(): Array[String] = { super.additionalProperties() ++ - Seq("'format' = 'testcsv'", s"'$USE_BULK_WRITER' = '$useBulkWriter'") ++ + Seq( + "'format' = 'testcsv'", + s"'testcsv.use-bulk-writer' = '$useBulkWriter'") ++ (if (useBulkWriter) Seq() else Seq("'sink.rolling-policy.file-size' = '1'")) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala index 6bca9c76f3c9d..8043e145bcca5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory.USE_BULK_WRITER - import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -34,7 +32,7 @@ class StreamFileSystemTestCsvITCase(useBulkWriter: Boolean) extends StreamFileSy override def formatProperties(): Array[String] = { super.formatProperties() ++ Seq( "'format' = 'testcsv'", - s"'$USE_BULK_WRITER' = '$useBulkWriter'") + s"'testcsv.use-bulk-writer' = '$useBulkWriter'") } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java index 4e8c60a9ee792..0d9739fcb4eea 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java @@ -29,6 +29,35 @@ */ public class FileSystemOptions { + public static final ConfigOption PATH = key("path") + .stringType() + .noDefaultValue() + .withDescription("The path of a directory"); + + public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") + .stringType() + .defaultValue("__DEFAULT_PARTITION__") + .withDescription("The default partition name in case the dynamic partition" + + " column value is null/empty string"); + + public static final ConfigOption SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size") + .longType() + .defaultValue(1024L * 1024L * 128L) + .withDescription("The maximum part file size before rolling (by default 128MB)."); + + public static final ConfigOption SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time-interval") + .durationType() + .defaultValue(Duration.ofMinutes(30)) + .withDescription("The maximum time duration a part file can stay open before rolling" + + " (by default 30 min to avoid to many small files)."); + + public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = key("sink.shuffle-by-partition.enable") + .booleanType() + .defaultValue(false) + .withDescription("The option to enable shuffle data by dynamic partition fields in sink" + + " phase, this can greatly reduce the number of file for filesystem sink but may" + + " lead data skew, the default value is disabled."); + public static final ConfigOption STREAMING_SOURCE_ENABLE = key("streaming-source.enable") .booleanType() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java index 43955c27842c0..8877087625b46 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java @@ -18,34 +18,27 @@ package org.apache.flink.table.filesystem; -import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FileSystemFormatFactory; import org.apache.flink.table.factories.TableFactory; -import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS; -import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND; -import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN; -import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_DELAY; -import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND; -import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; -import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_TRIGGER; +import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_DEFAULT_NAME; +import static org.apache.flink.table.filesystem.FileSystemOptions.PATH; /** * File system {@link TableFactory}. @@ -61,121 +54,64 @@ public class FileSystemTableFactory implements TableSourceFactory, TableSinkFactory { - public static final String CONNECTOR_VALUE = "filesystem"; - - /** - * Not use "connector.path" because: - * 1.Using "connector.path" will conflict with current batch csv source and batch csv sink. - * 2.This is compatible with FLIP-122. - */ - public static final String PATH = "path"; - - /** - * Move these properties to validator after FLINK-16904. - */ - public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") - .stringType() - .defaultValue("__DEFAULT_PARTITION__") - .withDescription("The default partition name in case the dynamic partition" + - " column value is null/empty string"); - - public static final ConfigOption SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size") - .longType() - .defaultValue(1024L * 1024L * 128L) - .withDescription("The maximum part file size before rolling (by default 128MB)."); - - public static final ConfigOption SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time.interval") - .longType() - .defaultValue(30L * 60 * 1000L) - .withDescription("The maximum time duration a part file can stay open before rolling" + - " (by default 30 min to avoid to many small files)."); - - public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = key("sink.shuffle-by-partition.enable") - .booleanType() - .defaultValue(false) - .withDescription("The option to enable shuffle data by dynamic partition fields in sink" + - " phase, this can greatly reduce the number of file for filesystem sink but may" + - " lead data skew, the default value is disabled."); + public static final String IDENTIFIER = "filesystem"; @Override public Map requiredContext() { Map context = new HashMap<>(); - context.put(CONNECTOR, CONNECTOR_VALUE); + context.put(CONNECTOR, IDENTIFIER); return context; } @Override public List supportedProperties() { - List properties = new ArrayList<>(); - - // path - properties.add(PATH); - - // schema - properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE); - properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME); - - // partition - properties.add(DescriptorProperties.PARTITION_KEYS + ".#." + - DescriptorProperties.PARTITION_KEYS_NAME); - properties.add(PARTITION_DEFAULT_NAME.key()); - - properties.add(SINK_ROLLING_POLICY_FILE_SIZE.key()); - properties.add(SINK_ROLLING_POLICY_TIME_INTERVAL.key()); - properties.add(SINK_SHUFFLE_BY_PARTITION.key()); - properties.add(PARTITION_TIME_EXTRACTOR_KIND.key()); - properties.add(PARTITION_TIME_EXTRACTOR_CLASS.key()); - properties.add(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()); - properties.add(SINK_PARTITION_COMMIT_TRIGGER.key()); - properties.add(SINK_PARTITION_COMMIT_DELAY.key()); - properties.add(SINK_PARTITION_COMMIT_POLICY_KIND.key()); - properties.add(SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()); - - // format - properties.add(FORMAT); - properties.add(FORMAT + ".*"); - - return properties; + // contains format properties. + return Collections.singletonList("*"); } @Override public TableSource createTableSource(TableSourceFactory.Context context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getTable().getProperties()); + Configuration conf = new Configuration(); + context.getTable().getOptions().forEach(conf::setString); return new FileSystemTableSource( context.getTable().getSchema(), - new Path(properties.getString(PATH)), + getPath(conf), context.getTable().getPartitionKeys(), - getPartitionDefaultName(properties), + conf.get(PARTITION_DEFAULT_NAME), context.getTable().getProperties()); } @Override public TableSink createTableSink(TableSinkFactory.Context context) { - DescriptorProperties properties = new DescriptorProperties(); - properties.putProperties(context.getTable().getProperties()); + Configuration conf = new Configuration(); + context.getTable().getOptions().forEach(conf::setString); return new FileSystemTableSink( context.getObjectIdentifier(), context.isBounded(), context.getTable().getSchema(), - new Path(properties.getString(PATH)), + getPath(conf), context.getTable().getPartitionKeys(), - getPartitionDefaultName(properties), + conf.get(PARTITION_DEFAULT_NAME), context.getTable().getOptions()); } - private static String getPartitionDefaultName(DescriptorProperties properties) { - return properties - .getOptionalString(PARTITION_DEFAULT_NAME.key()) - .orElse(PARTITION_DEFAULT_NAME.defaultValue()); + private static Path getPath(Configuration conf) { + return new Path(conf.getOptional(PATH).orElseThrow(() -> + new ValidationException("Path should be not empty."))); } public static FileSystemFormatFactory createFormatFactory(Map properties) { - return TableFactoryService.find( + String format = properties.get(FORMAT); + if (format == null) { + throw new ValidationException(String.format( + "Table options do not contain an option key '%s' for discovering a format.", + FORMAT)); + } + return FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), FileSystemFormatFactory.class, - properties, - FileSystemTableFactory.class.getClassLoader()); + format); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index 78910e7f1d552..8ac6c0be824c2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -64,8 +66,8 @@ import java.util.Optional; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND; -import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_FILE_SIZE; -import static org.apache.flink.table.filesystem.FileSystemTableFactory.SINK_ROLLING_POLICY_TIME_INTERVAL; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL; import static org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory; /** @@ -146,7 +148,7 @@ public final DataStreamSink consumeDataStream(DataStream dataS TableRollingPolicy rollingPolicy = new TableRollingPolicy( !(writer instanceof Encoder), conf.get(SINK_ROLLING_POLICY_FILE_SIZE), - conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL)); + conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis()); BucketsBuilder> bucketsBuilder; InactiveBucketListener listener = new InactiveBucketListener(); @@ -237,6 +239,9 @@ private OutputFormatFactory createOutputFormatFactory() { private Object createWriter() { FileSystemFormatFactory formatFactory = createFormatFactory(properties); + Configuration conf = new Configuration(); + properties.forEach(conf::setString); + FileSystemFormatFactory.WriterContext context = new FileSystemFormatFactory.WriterContext() { @Override @@ -245,8 +250,8 @@ public TableSchema getSchema() { } @Override - public Map getFormatProperties() { - return properties; + public ReadableConfig getFormatOptions() { + return new DelegatingConfiguration(conf, formatFactory.factoryIdentifier() + "."); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java index 7e52ced1e60cf..35ba8999859b4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java @@ -20,6 +20,9 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableException; @@ -59,7 +62,7 @@ public class FileSystemTableSource extends InputFormatTableSource imple private final Path path; private final List partitionKeys; private final String defaultPartName; - private final Map formatProperties; + private final Map properties; private final int[] selectFields; private final Long limit; @@ -75,15 +78,15 @@ public class FileSystemTableSource extends InputFormatTableSource imple * @param partitionKeys partition keys of the table. * @param defaultPartName The default partition name in case the dynamic partition column value * is null/empty string. - * @param formatProperties format properties. + * @param properties table properties. */ public FileSystemTableSource( TableSchema schema, Path path, List partitionKeys, String defaultPartName, - Map formatProperties) { - this(schema, path, partitionKeys, defaultPartName, formatProperties, null, null, null, null); + Map properties) { + this(schema, path, partitionKeys, defaultPartName, properties, null, null, null, null); } private FileSystemTableSource( @@ -91,7 +94,7 @@ private FileSystemTableSource( Path path, List partitionKeys, String defaultPartName, - Map formatProperties, + Map properties, List> readPartitions, int[] selectFields, Long limit, @@ -100,7 +103,7 @@ private FileSystemTableSource( this.path = path; this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; - this.formatProperties = formatProperties; + this.properties = properties; this.readPartitions = readPartitions; this.selectFields = selectFields; this.limit = limit; @@ -114,8 +117,10 @@ private FileSystemTableSource( return new CollectionInputFormat<>(new ArrayList<>(), null); } - return createFormatFactory(formatProperties).createReader( - new FileSystemFormatFactory.ReaderContext() { + FileSystemFormatFactory formatFactory = createFormatFactory(properties); + Configuration conf = new Configuration(); + properties.forEach(conf::setString); + return formatFactory.createReader(new FileSystemFormatFactory.ReaderContext() { @Override public TableSchema getSchema() { @@ -123,8 +128,8 @@ public TableSchema getSchema() { } @Override - public Map getFormatProperties() { - return formatProperties; + public ReadableConfig getFormatOptions() { + return new DelegatingConfiguration(conf, formatFactory.factoryIdentifier() + "."); } @Override @@ -212,7 +217,7 @@ public FileSystemTableSource applyPartitionPruning( path, partitionKeys, defaultPartName, - formatProperties, + properties, remainingPartitions, selectFields, limit, @@ -226,7 +231,7 @@ public FileSystemTableSource projectFields(int[] fields) { path, partitionKeys, defaultPartName, - formatProperties, + properties, readPartitions, fields, limit, @@ -240,7 +245,7 @@ public FileSystemTableSource applyLimit(long limit) { path, partitionKeys, defaultPartName, - formatProperties, + properties, readPartitions, selectFields, limit, @@ -259,7 +264,7 @@ public FileSystemTableSource applyPredicate(List predicates) { path, partitionKeys, defaultPartName, - formatProperties, + properties, readPartitions, selectFields, limit,