-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17604] Implement format factory for CSV serialization and #12065
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit a2a08d5 (Mon May 11 03:15:34 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
// Options | ||
// ------------------------------------------------------------------------ | ||
|
||
private static final ConfigOption<Character> FIELD_DELIMITER = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you extract these config options to a CsvOptions
? Because file system also requires these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would do, thanks for the suggestions ~
ef39423
to
fb244ad
Compare
…riazation schema of RowData type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I left some comments.
@Test | ||
public void testInvalidCharacterOption() { | ||
thrown.expect(ValidationException.class); | ||
thrown.expect(containsCause(new ValidationException("Option [csv.quote-character] must be a Character."))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thrown.expect(containsCause(new ValidationException("Option [csv.quote-character] must be a Character."))); | |
thrown.expect(containsCause(new ValidationException("Option 'csv.quote-character' must be a Character."))); |
public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER = ConfigOptions | ||
.key("array-element-delimiter") | ||
.stringType() | ||
.noDefaultValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default value is ;
.
DynamicTableFactory.Context context, | ||
ReadableConfig formatOptions) { | ||
validateFormatOptions(formatOptions); | ||
return new SinkFormat<SerializationSchema<RowData>>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a empty line before this.
public static final ConfigOption<String> FIELD_DELIMITER = ConfigOptions | ||
.key("field-delimiter") | ||
.stringType() | ||
.defaultValue(","); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add .withDescription(...)
to each option? We may auto generate the documentation in the near future. You can refer description in the documentation.
ConfigOption<String> option) { | ||
if (tableOptions.getOptional(option).isPresent()) { | ||
if (tableOptions.get(option).length() != 1) { | ||
throw new ValidationException(String.format("Option [%s.%s] must be a Character.", IDENTIFIER, option.key())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new ValidationException(String.format("Option [%s.%s] must be a Character.", IDENTIFIER, option.key())); | |
throw new ValidationException(String.format("Option '%s.%s' must be a single character, but was '%s'.", IDENTIFIER, option.key(), tableOptions.get(option))); |
Use 'csv.xxx'
to be more align with DDL options. And use character
instead of Character
because this is not in Java language.
} | ||
validateCharacterVal(tableOptions, FIELD_DELIMITER); | ||
validateCharacterVal(tableOptions, QUOTE_CHARACTER); | ||
validateCharacterVal(tableOptions, ESCAPE_CHARACTER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ARRAY_ELEMENT_DELIMITER
is also a single character in CsvValidator
.
// Validation | ||
// ------------------------------------------------------------------------ | ||
|
||
private static void validateFormatOptions(ReadableConfig tableOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check LINE_DELIMITER
is one of "\r", "\n", "\r\n".
@SuppressWarnings("unchecked") | ||
@Override | ||
public ScanFormat<DeserializationSchema<RowData>> createScanFormat( | ||
DynamicTableFactory.Context context, ReadableConfig formatOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call FactoryUtil.validateFactoryOptions(this, formatOptions);
at the beginning. It will checks all the options.
Please also add a test for verify invalid "csv.ignore-parse-errors" = "abc" option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to FactoryUtil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in the factory instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updating. LGTM.
The failed tests are not related to this one. Will merge it. |
The failed python tests is caused by FLINK-17596. |
deseriazation schema of RowData type
What is the purpose of the change
Implements format factory for CSV RowData Se/De schema so that they can use in SQL.
Brief change log
Verifying this change
Added UTs.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation