Skip to content

Commit

Permalink
[FLINK-18491][python] Extract the Beam specific coder classes into a …
Browse files Browse the repository at this point in the history
…separate Python module

This closes apache#12870.
  • Loading branch information
HuangXingBo authored and dianfu committed Jul 14, 2020
1 parent 5ff2c9e commit ca6cef9
Show file tree
Hide file tree
Showing 22 changed files with 833 additions and 618 deletions.
2 changes: 1 addition & 1 deletion flink-python/bin/pyflink-udf-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ if [[ "$_PYTHON_WORKING_DIR" != "" ]]; then
fi

log="$BOOT_LOG_DIR/flink-python-udf-boot.log"
${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee ${log}
${python} -m pyflink.fn_execution.beam.beam_boot $@ 2>&1 | tee ${log}
17 changes: 17 additions & 0 deletions flink-python/pyflink/fn_execution/beam/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ def check_not_empty(check_str, error_message):
if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1":
exit(0)

call([python_exec, "-m", "pyflink.fn_execution.sdk_worker_main"],
call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"],
stdout=sys.stdout, stderr=sys.stderr, env=env)
36 changes: 36 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# cython: language_level = 3
# cython: infer_types = True
# cython: profile=True
# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True

from apache_beam.coders.coder_impl cimport StreamCoderImpl

from pyflink.fn_execution.fast_coder_impl cimport BaseCoderImpl
from pyflink.fn_execution.stream cimport InputStream

cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
cdef readonly StreamCoderImpl _value_coder

cdef class BeamCoderImpl(StreamCoderImpl):
cdef readonly BaseCoderImpl _value_coder

cdef class InputStreamWrapper:
cdef BaseCoderImpl _value_coder
cdef InputStream _input_stream
58 changes: 58 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# cython: language_level = 3
# cython: infer_types = True
# cython: profile=True
# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True

from apache_beam.coders.coder_impl cimport InputStream as BInputStream
from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
from apache_beam.coders.coder_impl cimport StreamCoderImpl

from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream

cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
def __cinit__(self, value_coder):
self._value_coder = value_coder

cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
self._value_coder.encode_to_stream(value, out_stream, nested)

cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
return self._value_coder.decode_from_stream(in_stream, nested)

cpdef get_estimated_size_and_observables(self, value, bint nested=False):
return 0, []

cdef class BeamCoderImpl(StreamCoderImpl):
def __cinit__(self, value_coder):
self._value_coder = value_coder

cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
self._value_coder.encode(value, out_stream)

cpdef decode_from_stream(self, BInputStream in_stream, bint nested):
cdef BeamInputStream input_stream = BeamInputStream(in_stream, in_stream.size())
cdef InputStreamWrapper input_stream_wrapper = InputStreamWrapper(self._value_coder,
input_stream)
return input_stream_wrapper

cdef class InputStreamWrapper:
def __cinit__(self, value_coder, input_stream):
self._value_coder = value_coder
self._input_stream = input_stream
213 changes: 213 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_coders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import pyarrow as pa
import pytz
from apache_beam.coders import Coder
from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
from apache_beam.portability import common_urns
from apache_beam.typehints import typehints

from pyflink.fn_execution.beam import beam_slow_coder_impl

try:
from pyflink.fn_execution import fast_coder_impl as coder_impl
from pyflink.fn_execution.beam.beam_coder_impl import BeamCoderImpl, \
PassThroughLengthPrefixCoderImpl
except ImportError:
coder_impl = beam_slow_coder_impl
BeamCoderImpl = lambda a: a
PassThroughLengthPrefixCoderImpl = coder_impl.PassThroughLengthPrefixCoderImpl

from pyflink.fn_execution import flink_fn_execution_pb2, coders
from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \
FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \
LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType


class PassThroughLengthPrefixCoder(LengthPrefixCoder):
"""
Coder which doesn't prefix the length of the encoded object as the length prefix will be handled
by the wrapped value coder.
"""

def __init__(self, value_coder):
super(PassThroughLengthPrefixCoder, self).__init__(value_coder)

def _create_impl(self):
return PassThroughLengthPrefixCoderImpl(self._value_coder.get_impl())

def __repr__(self):
return 'PassThroughLengthPrefixCoder[%s]' % self._value_coder


Coder.register_structured_urn(
common_urns.coders.LENGTH_PREFIX.urn, PassThroughLengthPrefixCoder)


class BeamTableFunctionRowCoder(FastCoder):
"""
Coder for Table Function Row.
"""

def __init__(self, table_function_row_coder):
self._table_function_row_coder = table_function_row_coder

def _create_impl(self):
return self._table_function_row_coder.get_impl()

def get_impl(self):
return BeamCoderImpl(self._create_impl())

def to_type_hint(self):
return typehints.List

@Coder.register_urn(coders.FLINK_TABLE_FUNCTION_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
return BeamTableFunctionRowCoder(
coders.TableFunctionRowCoder.from_schema_proto(schema_proto))

def __repr__(self):
return 'TableFunctionRowCoder[%s]' % repr(self._table_function_row_coder)

def __eq__(self, other):
return (self.__class__ == other.__class__
and self._table_function_row_coder == other._table_function_row_coder)

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self._table_function_row_coder)


class BeamFlattenRowCoder(FastCoder):
"""
Coder for Row. The decoded result will be flattened as a list of column values of a row instead
of a row object.
"""

def __init__(self, flatten_coder):
self._flatten_coder = flatten_coder

def _create_impl(self):
return self._flatten_coder.get_impl()

def get_impl(self):
return BeamCoderImpl(self._create_impl())

def to_type_hint(self):
return typehints.List

@Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN,
flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
return BeamFlattenRowCoder(coders.FlattenRowCoder.from_schema_proto(schema_proto))

def __repr__(self):
return 'BeamFlattenRowCoder[%s]' % repr(self._flatten_coder)

def __eq__(self, other):
return (self.__class__ == other.__class__
and self._flatten_coder == other._flatten_coder)

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self._flatten_coder)


class ArrowCoder(FastCoder):
"""
Coder for Arrow.
"""

def __init__(self, schema, row_type, timezone):
self._schema = schema
self._row_type = row_type
self._timezone = timezone

def _create_impl(self):
return beam_slow_coder_impl.ArrowCoderImpl(self._schema, self._row_type, self._timezone)

def to_type_hint(self):
import pandas as pd
return pd.Series

@Coder.register_urn(coders.FLINK_SCALAR_FUNCTION_SCHEMA_ARROW_CODER_URN,
flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):

def _to_arrow_schema(row_type):
return pa.schema([pa.field(n, to_arrow_type(t), t._nullable)
for n, t in zip(row_type.field_names(), row_type.field_types())])

def _to_data_type(field_type):
if field_type.type_name == flink_fn_execution_pb2.Schema.TINYINT:
return TinyIntType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.SMALLINT:
return SmallIntType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.INT:
return IntType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.BIGINT:
return BigIntType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.BOOLEAN:
return BooleanType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.FLOAT:
return FloatType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.DOUBLE:
return DoubleType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARCHAR:
return VarCharType(0x7fffffff, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.VARBINARY:
return VarBinaryType(0x7fffffff, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.DECIMAL:
return DecimalType(field_type.decimal_info.precision,
field_type.decimal_info.scale,
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.DATE:
return DateType(field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.TIME:
return TimeType(field_type.time_info.precision, field_type.nullable)
elif field_type.type_name == \
flink_fn_execution_pb2.Schema.LOCAL_ZONED_TIMESTAMP:
return LocalZonedTimestampType(field_type.local_zoned_timestamp_info.precision,
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.TIMESTAMP:
return TimestampType(field_type.timestamp_info.precision, field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.ARRAY:
return ArrayType(_to_data_type(field_type.collection_element_type),
field_type.nullable)
elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
return RowType(
[RowField(f.name, _to_data_type(f.type), f.description)
for f in field_type.row_schema.fields], field_type.nullable)
else:
raise ValueError("field_type %s is not supported." % field_type)

def _to_row_type(row_schema):
return RowType([RowField(f.name, _to_data_type(f.type)) for f in row_schema.fields])

timezone = pytz.timezone(os.environ['table.exec.timezone'])
row_type = _to_row_type(schema_proto)
return ArrowCoder(_to_arrow_schema(row_type), row_type, timezone)

def __repr__(self):
return 'ArrowCoder[%s]' % self._schema
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import sys

# force to register the operations to SDK Harness
from apache_beam.options.pipeline_options import PipelineOptions

try:
import pyflink.fn_execution.fast_operations
except ImportError:
import pyflink.fn_execution.operations

# force to register the coders to SDK Harness
import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import
import pyflink.fn_execution.beam.beam_coders # noqa # pylint: disable=unused-import

import apache_beam.runners.worker.sdk_worker_main

if 'PIPELINE_OPTIONS' in os.environ:
pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
os.environ['PIPELINE_OPTIONS'])
else:
pipeline_options = PipelineOptions.from_dictionary({})

if __name__ == '__main__':
apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
Loading

0 comments on commit ca6cef9

Please sign in to comment.