Skip to content

Commit

Permalink
[FLINK-13747][hive] Remove some TODOs in Hive connector
Browse files Browse the repository at this point in the history
This PR is to fix some obsolete TODOs.

This closes apache#9460.
  • Loading branch information
lirui-apache authored and bowenli86 committed Aug 16, 2019
1 parent 3d3e6c1 commit a6571bb
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ private static Future<Void> startHMS(HiveServerContext context, int port) throws
args.add(System.getProperty("java.class.path"));

// set sys properties
// TODO: generate hive-site.xml at runtime?
args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE)));
args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR)));
args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.connectors.hive;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.planner.runtime.utils.TableUtil;
import org.apache.flink.types.Row;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
Expand All @@ -39,6 +42,8 @@
import java.util.HashSet;
import java.util.List;

import scala.collection.JavaConverters;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -81,8 +86,9 @@ public void testDefaultPartitionName() throws Exception {
FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
assertTrue(fs.exists(defaultPartPath));

// TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed
assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part"));
TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery("select * from db1.part order by x");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect(flinkTable)).asJava();
assertEquals(Arrays.toString(new String[]{"1,1", "2,null"}), rows.toString());

hiveShell.execute("drop database db1 cascade");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.flink.table.planner.sinks.CollectTableSink
import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.util.AbstractID

import _root_.java.util.{UUID, ArrayList => JArrayList}

import _root_.scala.collection.JavaConversions._
Expand All @@ -46,7 +45,9 @@ object BatchTableEnvUtil {
tEnv: TableEnvironment,
table: Table,
sink: CollectTableSink[T],
jobName: Option[String]): Seq[T] = {
jobName: Option[String],
builtInCatalogName: String,
builtInDBName: String): Seq[T] = {
val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
.createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
Expand All @@ -55,7 +56,7 @@ object BatchTableEnvUtil {
sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
val sinkName = UUID.randomUUID().toString
tEnv.registerTableSink(sinkName, sink)
tEnv.insertInto(table, sinkName)
tEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName)

val res = tEnv.execute("test")
val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.runtime.utils

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.internal.TableImpl
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
Expand Down Expand Up @@ -52,7 +53,9 @@ object TableUtil {
new CollectTableSink(_ => t.asInstanceOf[TypeInformation[T]]), Option(jobName))

def collectSink[T](
table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = {
table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None,
builtInCatalogName: String = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
builtInDBName: String = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE): Seq[T] = {
// get schema information of table
val relNode = TableTestUtil.toRelNode(table)
val rowType = relNode.getRowType
Expand All @@ -71,7 +74,8 @@ object TableUtil {
val configuredSink = sink.configure(
fieldNames, fieldTypes.map(TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo))
BatchTableEnvUtil.collect(table.getTableEnvironment,
table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName)
table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName,
builtInCatalogName, builtInDBName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,7 @@ class TestingTableEnvironment private(
}

override def insertInto(table: Table, path: String, pathContinued: String*): Unit = {
val fullPath = List(path)
fullPath.addAll(pathContinued)
val fullPath = List(path) ++ pathContinued.toList

val modifyOperations = List(new CatalogSinkModifyOperation(fullPath, table.getQueryOperation))
if (isEagerOperationTranslation) {
Expand Down

0 comments on commit a6571bb

Please sign in to comment.