Skip to content

Commit

Permalink
[FLINK-12719][python] Add the Python catalog API
Browse files Browse the repository at this point in the history
This closes apache#8623
  • Loading branch information
dianfu authored and sunjincheng121 committed Jun 14, 2019
1 parent e3612c5 commit e087c1e
Show file tree
Hide file tree
Showing 12 changed files with 2,451 additions and 25 deletions.
15 changes: 14 additions & 1 deletion flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ PYTHON_JAR_PATH=`echo "$FLINK_HOME"/opt/flink-python*java-binding.jar`
FLINK_TEST_CLASSPATH=""
if [[ -n "$FLINK_TESTING" ]]; then
bin=`dirname "$0"`
FLINK_SOURCE_ROOT_DIR=`cd "$bin/../../"; pwd -P`
CURRENT_DIR=`pwd -P`
cd "$bin/../../"
FLINK_SOURCE_ROOT_DIR=`pwd -P`

# Downloads avro as it is needed by the unit test
AVRO_VERSION=`mvn help:evaluate -Dexpression=avro.version | grep --invert-match -E '\[|Download*'`
if [[ ! -f ${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar ]]; then
mvn org.apache.maven.plugins:maven-dependency-plugin:2.10:copy -Dartifact=org.apache.avro:avro:"$AVRO_VERSION":jar -DoutputDirectory=$FLINK_SOURCE_ROOT_DIR/flink-formats/flink-avro/target > /dev/null 2>&1
if [[ ! -f ${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar ]]; then
echo "Download avro-$AVRO_VERSION.jar failed."
fi
fi

FIND_EXPRESSION=""
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-csv/target/flink-csv*.jar"
Expand All @@ -76,6 +87,8 @@ if [[ -n "$FLINK_TESTING" ]]; then
fi
done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name 'flink-*-tests.jar'${FIND_EXPRESSION} \) -print0 | sort -z)
set +f

cd $CURRENT_DIR
fi

ARGS_COUNT=${#ARGS[@]}
Expand Down
19 changes: 0 additions & 19 deletions flink-python/dev/lint-python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,6 @@ function check_stage() {
echo "All the checks are finished, the detailed information can be found in: $LOG_FILE"
}

function download_java_dependencies() {
INITIAL_DIR=`pwd`
cd "$CURRENT_DIR/../../"
AVRO_VERSION=`mvn help:evaluate -Dexpression=avro.version | grep --invert-match -E '\[|Download*'`
if [ ! -f `pwd`/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar ]; then
echo "Downloading avro-$AVRO_VERSION.jar..."
mvn org.apache.maven.plugins:maven-dependency-plugin:2.10:copy -Dartifact=org.apache.avro:avro:"$AVRO_VERSION":jar -DoutputDirectory=`pwd`/flink-formats/flink-avro/target
if [ ! -f `pwd`/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar ]; then
echo "Download avro-$AVRO_VERSION.jar failed."
else
echo "avro-$AVRO_VERSION.jar downloaded."
fi
else
echo "avro-$AVRO_VERSION.jar already exists, no need to download."
fi
cd "$INITIAL_DIR"
}

###############################################################All Checks Definitions###############################################################
#########################
Expand Down Expand Up @@ -598,7 +581,5 @@ fi
# install environment
install_environment

download_java_dependencies

# exec all selected checks
check_stage
5 changes: 4 additions & 1 deletion flink-python/pyflink/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
from pyflink.find_flink_home import _find_flink_home
from pyflink.util.exceptions import install_exception_handler

_gateway = None
_lock = RLock()
Expand All @@ -49,6 +50,7 @@ def get_gateway():

# import the flink view
import_flink_view(_gateway)
install_exception_handler()
return _gateway


Expand Down Expand Up @@ -115,9 +117,10 @@ def import_flink_view(gateway):
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
java_import(gateway.jvm, "org.apache.flink.table.catalog.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.types.*")
java_import(gateway.jvm, "org.apache.flink.table.types.logical.*")
java_import(gateway.jvm, "org.apache.flink.table.util.python.*")
Expand Down
24 changes: 23 additions & 1 deletion flink-python/pyflink/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
- :class:`pyflink.table.TableSink`
Specifies how to emit a table to an external system or location.
"""
from pyflink.table.catalog import (Catalog, CatalogDatabase, CatalogBaseTable, CatalogPartition,
CatalogFunction, ObjectPath, CatalogPartitionSpec,
CatalogTableStatistics, CatalogColumnStatistics, HiveCatalog,
HiveCatalogDatabase, HiveCatalogFunction, HiveCatalogPartition,
HiveCatalogTable, HiveCatalogView)
from pyflink.table.table import Table
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
Expand Down Expand Up @@ -70,5 +75,22 @@
'Row',
'Kafka',
'Elasticsearch',
'TableSchema'
'TableSchema',

# Catalog APIs
'Catalog',
'CatalogDatabase',
'CatalogBaseTable',
'CatalogPartition',
'CatalogFunction',
'ObjectPath',
'CatalogPartitionSpec',
'CatalogTableStatistics',
'CatalogColumnStatistics',
'HiveCatalog',
'HiveCatalogDatabase',
'HiveCatalogFunction',
'HiveCatalogPartition',
'HiveCatalogTable',
'HiveCatalogView'
]
Loading

0 comments on commit e087c1e

Please sign in to comment.