Skip to content

Commit

Permalink
[FLINK-12326][python] Add basic test framework for python table api.
Browse files Browse the repository at this point in the history
This closes apache#8347
  • Loading branch information
dianfu authored and sunjincheng121 committed May 8, 2019
1 parent 94c08c5 commit 84eec21
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 88 deletions.
17 changes: 16 additions & 1 deletion flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,19 @@ log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}

FLINK_TEST_CLASSPATH=""
if [[ -n "$FLINK_TESTING" ]]; then
bin=`dirname "$0"`
FLINK_SOURCE_ROOT_DIR=`cd "$bin/../../"; pwd -P`

while read -d '' -r testJarFile ; do
if [[ "$FLINK_TEST_CLASSPATH" == "" ]]; then
FLINK_TEST_CLASSPATH="$testJarFile";
else
FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
fi
done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d -name 'flink-*-tests.jar' -print0 | sort -z)
fi

exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${FLINK_TEST_CLASSPATH} ${DRIVER} ${ARGS[@]}
19 changes: 16 additions & 3 deletions flink-python/pyflink/find_flink_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from __future__ import print_function

import logging
import os
import sys

Expand All @@ -37,10 +38,22 @@ def _find_flink_home():
return build_target
except Exception:
pass
print("Could not find valid FLINK_HOME(Flink distribution directory) "
"in current environment.", file=sys.stderr)
logging.error("Could not find valid FLINK_HOME(Flink distribution directory) "
"in current environment.")
sys.exit(-1)


def _find_flink_source_root():
"""
Find the flink source root directory.
"""
try:
return os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
except Exception:
pass
logging.error("Could not find valid flink source root directory in current environment.")
sys.exit(-1)


if __name__ == "__main__":
print(_find_flink_home())
70 changes: 70 additions & 0 deletions flink-python/pyflink/table/tests/test_calc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# ###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

from pyflink.table.table_source import CsvTableSource
from pyflink.table.types import DataTypes
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase


class TableTests(PyFlinkStreamTableTestCase):

def test_select(self):
source_path = os.path.join(self.tempdir + '/streaming.csv')
with open(source_path, 'w') as f:
lines = '1,hi,hello\n' + '2,hi,hello\n'
f.write(lines)
f.close()

field_names = ["a", "b", "c"]
field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]

t_env = self.t_env

# register Orders table in table environment
t_env.register_table_source(
"Orders",
CsvTableSource(source_path, field_names, field_types))

t_env.register_table_sink(
"Results",
field_names, field_types, source_sink_utils.TestAppendSink())

t_env.scan("Orders") \
.where("a > 0") \
.select("a + 1, b, c") \
.insert_into("Results")

t_env.execute()

actual = source_sink_utils.results()
expected = ['2,hi,hello', '3,hi,hello']
self.assert_equals(actual, expected)


if __name__ == '__main__':
import unittest

try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
73 changes: 0 additions & 73 deletions flink-python/pyflink/table/tests/test_end_to_end.py

This file was deleted.

17 changes: 17 additions & 0 deletions flink-python/pyflink/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
131 changes: 131 additions & 0 deletions flink-python/pyflink/testing/source_sink_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

import glob
import os
import sys
import unittest
from py4j.java_gateway import java_import

from pyflink.find_flink_home import _find_flink_source_root
from pyflink.java_gateway import get_gateway
from pyflink.table import TableSink

if sys.version_info[0] >= 3:
xrange = range


class TestTableSink(TableSink):
"""
Base class for test table sink.
"""

_inited = False

def __init__(self, j_table_sink):
super(TestTableSink, self).__init__(j_table_sink)

@classmethod
def _ensure_initialized(cls):
if TestTableSink._inited:
return

FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
filename_pattern = (
"flink-table/flink-table-planner/target/"
"flink-table-planner*-tests.jar")
if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, filename_pattern)):
raise unittest.SkipTest(
"'flink-table-planner*-tests.jar' is not available. Will skip the related tests.")

gateway = get_gateway()
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestAppendSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestRetractSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.TestUpsertSink")
java_import(gateway.jvm, "org.apache.flink.table.runtime.stream.table.RowCollector")

TestTableSink._inited = True


class TestAppendSink(TestTableSink):
"""
A test append table sink.
"""

def __init__(self):
TestTableSink._ensure_initialized()

gateway = get_gateway()
super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())


class TestRetractSink(TestTableSink):
"""
A test retract table sink.
"""

def __init__(self):
TestTableSink._ensure_initialized()

gateway = get_gateway()
super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())


class TestUpsertSink(TestTableSink):
"""
A test upsert table sink.
"""

def __init__(self, keys, is_append_only):
TestTableSink._ensure_initialized()

gateway = get_gateway()
j_keys = gateway.new_array(gateway.jvm.String, len(keys))
for i in xrange(0, len(keys)):
j_keys[i] = keys[i]

super(TestUpsertSink, self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))


def results():
"""
Retrieves the results from an append table sink.
"""
return retract_results()


def retract_results():
"""
Retrieves the results from a retract table sink.
"""
gateway = get_gateway()
results = gateway.jvm.RowCollector.getAndClearValues()
return gateway.jvm.RowCollector.retractResults(results)


def upsert_results(keys):
"""
Retrieves the results from an upsert table sink.
"""
gateway = get_gateway()
j_keys = gateway.new_array(gateway.jvm.int, len(keys))
for i in xrange(0, len(keys)):
j_keys[i] = keys[i]

results = gateway.jvm.RowCollector.getAndClearValues()
return gateway.jvm.RowCollector.upsertResults(results, j_keys)
Loading

0 comments on commit 84eec21

Please sign in to comment.