Skip to content

Commit

Permalink
[FLINK-17255][python] Support HBase connector descriptor in PyFlink. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiqiangchen committed May 6, 2020
1 parent 40b742f commit a8cee7b
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 1 deletion.
21 changes: 21 additions & 0 deletions docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,27 @@ CREATE TABLE MyUserTable (
)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python%}
.connect(
HBase()
.version('1.4.3') # required: currently only support '1.4.3'
.table_name('hbase_table_name') # required: HBase table name
.zookeeper_quorum('localhost:2181') # required: HBase Zookeeper quorum configuration
.zookeeper_node_parent('/test') # optional: the root dir in Zookeeper for Hbae cluster.
# The default value is '/hbase'
.write_buffer_flush_max_size('10mb') # optional: writing option, determines how many size in memory of buffered
# rows to insert per round trip. This can help performance on writing to JDBC
# database. The default value is '2mb'
.write_buffer_flush_max_rows(1000) # optional: writing option, determines how many rows to insert per round trip.
# This can help performance on writing to JDBC database. No default value,
# i.e. the default flushing is not depends on the number of buffered rows.
.write_buffer_flush_interval('2s') # optional: writing option, sets a flush interval flushing buffered requesting
# if the interval passes, in milliseconds. Default value is '0s', which means
# no asynchronous flush thread will he scheduled.
)
{% endhighlight%}
</div>
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
Expand Down
1 change: 1 addition & 0 deletions flink-python/bin/pyflink-gateway-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ if [[ -n "$FLINK_TESTING" ]]; then
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar"
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-api/target/flink-ml-api*.jar"
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-lib/target/flink-ml-lib*.jar"
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-hbase/target/flink*.jar"

# disable the wildcard expansion for the moment.
set -f
Expand Down
96 changes: 96 additions & 0 deletions flink-python/pyflink/table/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'FileSystem',
'Kafka',
'Elasticsearch',
'HBase',
'Csv',
'Avro',
'Json',
Expand Down Expand Up @@ -1196,6 +1197,101 @@ def connection_path_prefix(self, path_prefix):
return self


class HBase(ConnectorDescriptor):
"""
Connector descriptor for Apache HBase.
"""

def __init__(self):
gateway = get_gateway()
self._j_hbase = gateway.jvm.HBase()
super(HBase, self).__init__(self._j_hbase)

def version(self, version):
"""
Set the Apache HBase version to be used, Required.
:param version: HBase version. E.g., "1.4.3".
:return: This object.
"""
if not isinstance(version, str):
version = str(version)
self._j_hbase = self._j_hbase.version(version)
return self

def table_name(self, table_name):
"""
Set the HBase table name, Required.
:param table_name: Name of HBase table. E.g., "testNamespace:testTable", "testDefaultTable"
:return: This object.
"""
self._j_hbase = self._j_hbase.tableName(table_name)
return self

def zookeeper_quorum(self, zookeeper_quorum):
"""
Set the zookeeper quorum address to connect the HBase cluster, Required.
:param zookeeper_quorum: zookeeper quorum address to connect the HBase cluster. E.g.,
"localhost:2181,localhost:2182,localhost:2183"
:return: This object.
"""
self._j_hbase = self._j_hbase.zookeeperQuorum(zookeeper_quorum)
return self

def zookeeper_node_parent(self, zookeeper_node_parent):
"""
Set the zookeeper node parent path of HBase cluster. Default to use "/hbase", Optional.
:param zookeeper_node_parent: zookeeper node path of hbase cluster. E.g,
"/hbase/example-root-znode".
:return: This object
"""
self._j_hbase = self._j_hbase.zookeeperNodeParent(zookeeper_node_parent)
return self

def write_buffer_flush_max_size(self, max_size):
"""
Set threshold when to flush buffered request based on the memory byte size of rows currently
added.
:param max_size: the maximum size.
:return: This object.
"""
if not isinstance(max_size, str):
max_size = str(max_size)
self._j_hbase = self._j_hbase.writeBufferFlushMaxSize(max_size)
return self

def write_buffer_flush_max_rows(self, write_buffer_flush_max_rows):
"""
Set threshold when to flush buffered request based on the number of rows currently added.
Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.
:param write_buffer_flush_max_rows: number of added rows when begin the request flushing.
:return: This object.
"""
self._j_hbase = self._j_hbase.writeBufferFlushMaxRows(write_buffer_flush_max_rows)
return self

def write_buffer_flush_interval(self, interval):
"""
Set an interval when to flushing buffered requesting if the interval passes, in
milliseconds.
Defaults to not set, i.e. won't flush based on flush interval, Optional.
:param interval: flush interval. The string should be in format
"{length value}{time unit label}" E.g, "123ms", "1 s", if not time unit
label is specified, it will be considered as milliseconds.
:return: This object.
"""
if not isinstance(interval, str):
interval = str(interval)
self._j_hbase = self._j_hbase.writeBufferFlushInterval(interval)
return self


class CustomConnectorDescriptor(ConnectorDescriptor):
"""
Describes a custom connector to an other system.
Expand Down
99 changes: 98 additions & 1 deletion flink-python/pyflink/table/tests/test_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor,
CustomFormatDescriptor)
CustomFormatDescriptor, HBase)
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
Expand Down Expand Up @@ -360,6 +360,103 @@ def test_custom_connector(self):
self.assertEqual(expected, properties)


class HBaseDescriptorTests(PyFlinkTestCase):

def test_version(self):
hbase = HBase().version("1.4.3")

properties = hbase.to_properties()
expected = {'connector.version': '1.4.3',
'connector.type': 'hbase',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

hbase = HBase().version(1.1)
properties = hbase.to_properties()
expected = {'connector.version': '1.1',
'connector.type': 'hbase',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_table_name(self):
hbase = HBase().table_name('tableName1')

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.table-name': 'tableName1',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_zookeeper_quorum(self):
hbase = HBase().zookeeper_quorum("localhost:2181,localhost:2182")

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.zookeeper.quorum': 'localhost:2181,localhost:2182',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_zookeeper_node_parent(self):
hbase = HBase().zookeeper_node_parent('/hbase/example-root-znode')

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.zookeeper.znode.parent': '/hbase/example-root-znode',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_write_buffer_flush_max_size(self):
hbase = HBase().write_buffer_flush_max_size('1000')

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.write.buffer-flush.max-size': '1000 bytes',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

hbase = HBase().write_buffer_flush_max_size(1000)
properties = hbase.to_properties()
self.assertEqual(expected, properties)

hbase = HBase().write_buffer_flush_max_size('10mb')
properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.write.buffer-flush.max-size': '10 mb',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_write_buffer_flush_max_rows(self):
hbase = HBase().write_buffer_flush_max_rows(10)

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.write.buffer-flush.max-rows': '10',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

def test_write_buffer_flush_interval(self):
hbase = HBase().write_buffer_flush_interval('123')

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.write.buffer-flush.interval': '123',
'connector.property-version': '1'}
self.assertEqual(expected, properties)

hbase = HBase().write_buffer_flush_interval(123)

properties = hbase.to_properties()
self.assertEqual(expected, properties)

hbase = HBase().write_buffer_flush_interval('123ms')

properties = hbase.to_properties()
expected = {'connector.type': 'hbase',
'connector.write.buffer-flush.interval': '123ms',
'connector.property-version': '1'}
self.assertEqual(expected, properties)


class OldCsvDescriptorTests(PyFlinkTestCase):

def test_field_delimiter(self):
Expand Down

0 comments on commit a8cee7b

Please sign in to comment.