Skip to content

Commit

Permalink
[FLINK-18488][python] Added additional params to CsvTableSource const…
Browse files Browse the repository at this point in the history
…ructor

This closes apache#12824.
  • Loading branch information
ChrisNixGSI authored and dianfu committed Jul 7, 2020
1 parent a72c53a commit 0af00c3
Showing 1 changed file with 77 additions and 10 deletions.
87 changes: 77 additions & 10 deletions flink-python/pyflink/table/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
################################################################################

from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, _to_java_type
from pyflink.util import utils
from pyflink.table.types import _to_java_type

__all__ = ['TableSource', 'CsvTableSource']

Expand All @@ -37,17 +36,85 @@ class CsvTableSource(TableSource):
A :class:`TableSource` for simple CSV files with a
(logically) unlimited number of fields.
Example:
::
>>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()])
:param source_path: The path to the CSV file.
:type source_path: str
:param field_names: The names of the table fields.
:type field_names: collections.Iterable[str]
:param field_types: The types of the table fields.
:type field_types: collections.Iterable[str]
:param field_delim: The field delimiter, "," by default.
:type field_delim: str, optional
:param line_delim: The row delimiter, "\\n" by default.
:type line_delim: str, optional
:param quote_character: An optional quote character for String values, null by default.
:type quote_character: str, optional
:param ignore_first_line: Flag to ignore the first line, false by default.
:type ignore_first_line: bool, optional
:param ignore_comments: An optional prefix to indicate comments, null by default.
:type ignore_comments: str, optional
:param lenient: Flag to skip records with parse error instead to fail, false by default.
:type lenient: bool, optional
:param empty_column_as_null: Treat empty column as null, false by default.
:type empty_column_as_null: bool, optional
"""

def __init__(self, source_path, field_names, field_types):
# type: (str, list[str], list[DataType]) -> None
def __init__(
self,
source_path,
field_names,
field_types,
field_delim=None,
line_delim=None,
quote_character=None,
ignore_first_line=None,
ignore_comments=None,
lenient=None,
empty_column_as_null=None,
):
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(field_type)
for field_type in field_types])
super(CsvTableSource, self).__init__(
gateway.jvm.CsvTableSource(source_path, j_field_names, j_field_types))

builder = gateway.jvm.CsvTableSource.builder()
builder.path(source_path)

for (field_name, field_type) in zip(field_names, field_types):
builder.field(field_name, _to_java_type(field_type))

if field_delim is not None:
builder.fieldDelimiter(field_delim)

if line_delim is not None:
builder.lineDelimiter(line_delim)

if quote_character is not None:
# Java API has a Character type for this field. At time of writing,
# Py4J will convert the Python str to Java Character by taking only
# the first character. This results in either:
# - Silently truncating a Python str with more than one character
# with no further type error from either Py4J or Java
# CsvTableSource
# - java.lang.StringIndexOutOfBoundsException from Py4J for an
# empty Python str. That error can be made more friendly here.
if len(quote_character) != 1:
raise ValueError(
"Expected a single CSV quote character but got '{}'".format(quote_character)
)
builder.quoteCharacter(quote_character)

if ignore_first_line:
builder.ignoreFirstLine()

if ignore_comments is not None:
builder.commentPrefix(ignore_comments)

if lenient:
builder.ignoreParseErrors()

if empty_column_as_null:
builder.emptyColumnAsNull()

super(CsvTableSource, self).__init__(builder.build())

0 comments on commit 0af00c3

Please sign in to comment.