diff --git a/flink-python/pyflink/table/sources.py b/flink-python/pyflink/table/sources.py index 30de59fa86222..cbce0be0d60db 100644 --- a/flink-python/pyflink/table/sources.py +++ b/flink-python/pyflink/table/sources.py @@ -17,8 +17,7 @@ ################################################################################ from pyflink.java_gateway import get_gateway -from pyflink.table.types import DataType, _to_java_type -from pyflink.util import utils +from pyflink.table.types import _to_java_type __all__ = ['TableSource', 'CsvTableSource'] @@ -37,17 +36,85 @@ class CsvTableSource(TableSource): A :class:`TableSource` for simple CSV files with a (logically) unlimited number of fields. + Example: + :: + + >>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()]) + :param source_path: The path to the CSV file. + :type source_path: str :param field_names: The names of the table fields. + :type field_names: collections.Iterable[str] :param field_types: The types of the table fields. + :type field_types: collections.Iterable[str] + :param field_delim: The field delimiter, "," by default. + :type field_delim: str, optional + :param line_delim: The row delimiter, "\\n" by default. + :type line_delim: str, optional + :param quote_character: An optional quote character for String values, null by default. + :type quote_character: str, optional + :param ignore_first_line: Flag to ignore the first line, false by default. + :type ignore_first_line: bool, optional + :param ignore_comments: An optional prefix to indicate comments, null by default. + :type ignore_comments: str, optional + :param lenient: Flag to skip records with parse error instead to fail, false by default. + :type lenient: bool, optional + :param empty_column_as_null: Treat empty column as null, false by default. + :type empty_column_as_null: bool, optional """ - def __init__(self, source_path, field_names, field_types): - # type: (str, list[str], list[DataType]) -> None + def __init__( + self, + source_path, + field_names, + field_types, + field_delim=None, + line_delim=None, + quote_character=None, + ignore_first_line=None, + ignore_comments=None, + lenient=None, + empty_column_as_null=None, + ): gateway = get_gateway() - j_field_names = utils.to_jarray(gateway.jvm.String, field_names) - j_field_types = utils.to_jarray(gateway.jvm.TypeInformation, - [_to_java_type(field_type) - for field_type in field_types]) - super(CsvTableSource, self).__init__( - gateway.jvm.CsvTableSource(source_path, j_field_names, j_field_types)) + + builder = gateway.jvm.CsvTableSource.builder() + builder.path(source_path) + + for (field_name, field_type) in zip(field_names, field_types): + builder.field(field_name, _to_java_type(field_type)) + + if field_delim is not None: + builder.fieldDelimiter(field_delim) + + if line_delim is not None: + builder.lineDelimiter(line_delim) + + if quote_character is not None: + # Java API has a Character type for this field. At time of writing, + # Py4J will convert the Python str to Java Character by taking only + # the first character. This results in either: + # - Silently truncating a Python str with more than one character + # with no further type error from either Py4J or Java + # CsvTableSource + # - java.lang.StringIndexOutOfBoundsException from Py4J for an + # empty Python str. That error can be made more friendly here. + if len(quote_character) != 1: + raise ValueError( + "Expected a single CSV quote character but got '{}'".format(quote_character) + ) + builder.quoteCharacter(quote_character) + + if ignore_first_line: + builder.ignoreFirstLine() + + if ignore_comments is not None: + builder.commentPrefix(ignore_comments) + + if lenient: + builder.ignoreParseErrors() + + if empty_column_as_null: + builder.emptyColumnAsNull() + + super(CsvTableSource, self).__init__(builder.build())