From a6571bb61f41a65b47ec250231a32e14b1390069 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Fri, 16 Aug 2019 15:46:00 +0800 Subject: [PATCH] [FLINK-13747][hive] Remove some TODOs in Hive connector This PR is to fix some obsolete TODOs. This closes #9460. --- .../connectors/hive/FlinkStandaloneHiveRunner.java | 1 - .../connectors/hive/TableEnvHiveConnectorTest.java | 10 ++++++++-- .../planner/runtime/utils/BatchTableEnvUtil.scala | 7 ++++--- .../flink/table/planner/runtime/utils/TableUtil.scala | 8 ++++++-- .../flink/table/planner/utils/TableTestBase.scala | 3 +-- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java index 35dc89298cebc..343a299057091 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java @@ -355,7 +355,6 @@ private static Future startHMS(HiveServerContext context, int port) throws args.add(System.getProperty("java.class.path")); // set sys properties - // TODO: generate hive-site.xml at runtime? args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE))); args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR))); args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR))); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index 0845eaebd089d..bf6ec2f4d8c28 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -19,11 +19,14 @@ package org.apache.flink.connectors.hive; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.planner.runtime.utils.TableUtil; +import org.apache.flink.types.Row; import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; @@ -39,6 +42,8 @@ import java.util.HashSet; import java.util.List; +import scala.collection.JavaConverters; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -81,8 +86,9 @@ public void testDefaultPartitionName() throws Exception { FileSystem fs = defaultPartPath.getFileSystem(hiveConf); assertTrue(fs.exists(defaultPartPath)); - // TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed - assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part")); + TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery("select * from db1.part order by x"); + List rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect(flinkTable)).asJava(); + assertEquals(Arrays.toString(new String[]{"1,1", "2,null"}), rows.toString()); hiveShell.execute("drop database db1 cascade"); } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala index fca9c1d458c7b..7305f5396ab23 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala @@ -33,7 +33,6 @@ import org.apache.flink.table.planner.sinks.CollectTableSink import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import org.apache.flink.util.AbstractID - import _root_.java.util.{UUID, ArrayList => JArrayList} import _root_.scala.collection.JavaConversions._ @@ -46,7 +45,9 @@ object BatchTableEnvUtil { tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], - jobName: Option[String]): Seq[T] = { + jobName: Option[String], + builtInCatalogName: String, + builtInDBName: String): Seq[T] = { val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] @@ -55,7 +56,7 @@ object BatchTableEnvUtil { sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) - tEnv.insertInto(table, sinkName) + tEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName) val res = tEnv.execute("test") val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala index 1202e224e5af3..6b98d2cf8b0bd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.utils import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.internal.TableImpl import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType @@ -52,7 +53,9 @@ object TableUtil { new CollectTableSink(_ => t.asInstanceOf[TypeInformation[T]]), Option(jobName)) def collectSink[T]( - table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = { + table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None, + builtInCatalogName: String = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, + builtInDBName: String = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE): Seq[T] = { // get schema information of table val relNode = TableTestUtil.toRelNode(table) val rowType = relNode.getRowType @@ -71,7 +74,8 @@ object TableUtil { val configuredSink = sink.configure( fieldNames, fieldTypes.map(TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo)) BatchTableEnvUtil.collect(table.getTableEnvironment, - table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName) + table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName, + builtInCatalogName, builtInDBName) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index eb8aa5719e2fa..0201d772b668a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -953,8 +953,7 @@ class TestingTableEnvironment private( } override def insertInto(table: Table, path: String, pathContinued: String*): Unit = { - val fullPath = List(path) - fullPath.addAll(pathContinued) + val fullPath = List(path) ++ pathContinued.toList val modifyOperations = List(new CatalogSinkModifyOperation(fullPath, table.getQueryOperation)) if (isEagerOperationTranslation) {