Skip to content

Commit

Permalink
[FLINK-17126][table-planner] Correct the execution behavior of BatchT…
Browse files Browse the repository at this point in the history
…ableEnvironment

This closes apache#11794
  • Loading branch information
godfreyhe authored and wuchong committed Apr 22, 2020
1 parent cbcf2fe commit c565ec3
Show file tree
Hide file tree
Showing 14 changed files with 507 additions and 16 deletions.
2 changes: 1 addition & 1 deletion flink-python/dev/pip_test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

t.select("a + 1, b, c").insert_into("batch_sink")

b_env.execute()
bt_env.execute("test")

with open(sink_path, 'r') as f:
lines = f.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import tempfile
import time

import unittest

from pyflink.common import ExecutionConfig, RestartStrategies
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import DataTypes, BatchTableEnvironment, CsvTableSource, CsvTableSink
Expand Down Expand Up @@ -96,6 +98,7 @@ def test_register_type(self):
self.assertEqual(type_list,
["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])

@unittest.skip("Python API does not support DataSet now. refactor this test later")
def test_get_execution_plan(self):
tmp_dir = tempfile.gettempdir()
source_path = os.path.join(tmp_dir + '/streaming.csv')
Expand Down Expand Up @@ -125,7 +128,7 @@ def test_execute(self):
CsvTableSink(field_names, field_types,
os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
t_env.insert_into('Results', t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
execution_result = self.env.execute('test_batch_execute')
execution_result = t_env.execute('test_batch_execute')
self.assertIsNotNone(execution_result.get_job_id())
self.assertTrue(execution_result.is_job_execution_result())
self.assertIsNotNone(execution_result.get_job_execution_result().get_job_id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public Pipeline createPipeline(String name) {
StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
return streamTableEnv.getPipeline(name);
} else {
return execEnv.createProgramPlan(name);
BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;
return batchTableEnv.getPipeline(name);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ package org.apache.flink.table.api.internal
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Pipeline
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.utils.PlanGenerator
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.core.execution.DetachedJobExecutionResult
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager}
Expand All @@ -43,12 +48,17 @@ import org.apache.flink.table.sources.{BatchTableSource, InputFormatTableSource,
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, validateInputTypeInfo}
import org.apache.flink.table.util.DummyNoOpOperator
import org.apache.flink.table.utils.TableConnectorUtils
import org.apache.flink.types.Row
import org.apache.flink.util.ExceptionUtils
import org.apache.flink.util.Preconditions.checkNotNull

import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode

import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections}

import _root_.scala.collection.JavaConverters._

/**
Expand All @@ -64,6 +74,8 @@ abstract class BatchTableEnvImpl(
moduleManager: ModuleManager)
extends TableEnvImpl(config, catalogManager, moduleManager) {

private val bufferedSinks = new JArrayList[DataSink[_]]

private[flink] val optimizer = new BatchOptimizer(
() => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
planningConfigurationBuilder
Expand Down Expand Up @@ -123,25 +135,36 @@ abstract class BatchTableEnvImpl(
table: Table,
sink: TableSink[T]): Unit = {

val batchTableEnv = createDummyBatchTableEnv()
sink match {
case batchSink: BatchTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
// create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context.
// NoOpOperator will be ignored in OperatorTranslation
// when translating DataSet to Operator, while its input can be translated normally.
val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
// Give the DataSet to the TableSink to emit it.
batchSink.consumeDataSet(result)
val dataSink = batchSink.consumeDataSet(dummyOp)
bufferedSinks.add(dataSink)
case boundedSink: OutputFormatTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
// create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context.
// NoOpOperator will be ignored in OperatorTranslation
// when translating DataSet to Operator, while its input can be translated normally.
val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType)
// use the OutputFormat to consume the DataSet.
val dataSink = result.output(boundedSink.getOutputFormat)
dataSink.name(
val dataSink = dummyOp.output(boundedSink.getOutputFormat)
val dataSinkWithName = dataSink.name(
TableConnectorUtils.generateRuntimeName(
boundedSink.getClass,
boundedSink.getTableSchema.getFieldNames))
bufferedSinks.add(dataSinkWithName)
case _ =>
throw new TableException(
"BatchTableSink or OutputFormatTableSink required to emit batch Table.")
Expand Down Expand Up @@ -215,12 +238,66 @@ abstract class BatchTableEnvImpl(

override def explain(table: Table): String = explain(table: Table, extended = false)

override def execute(jobName: String): JobExecutionResult = execEnv.execute(jobName)
override def execute(jobName: String): JobExecutionResult = {
val plan = createPipelineAndClearBuffer(jobName)

val configuration = execEnv.getConfiguration
checkNotNull(configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.")

val executorFactory = execEnv.getExecutorServiceLoader.getExecutorFactory(configuration)
checkNotNull(executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET))

val jobClientFuture = executorFactory.getExecutor(configuration).execute(plan, configuration)
try {
val jobClient = jobClientFuture.get
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobClient.getJobExecutionResult(execEnv.getUserCodeClassLoader).get
} else {
new DetachedJobExecutionResult(jobClient.getJobID)
}
} catch {
case t: Throwable =>
ExceptionUtils.rethrow(t)
// make javac happy, this code path will not be reached
null
}
}

/**
* This method is used for sql client to submit job.
*/
def getPipeline(jobName: String): Pipeline = {
createPipelineAndClearBuffer(jobName)
}

override def explain(extended: Boolean): String = {
throw new TableException("This method is unsupported in old planner.")
}

/**
* Translate the buffered sinks to Plan, and clear the buffer.
*
* <p>The buffer will be clear even if the `translate` fails. In most cases,
* the failure is not retryable (e.g. type mismatch, can't generate physical plan).
* If the buffer is not clear after failure, the following `translate` will also fail.
*/
private def createPipelineAndClearBuffer(jobName: String): Pipeline = {
try {
val generator = new PlanGenerator(
bufferedSinks,
execEnv.getConfig,
execEnv.getParallelism,
JCollections.emptyList(),
jobName)
generator.generate()
} finally {
bufferedSinks.clear()
}
}

protected def asQueryOperation[T](dataSet: DataSet[T], fields: Option[Array[Expression]])
: DataSetQueryOperation[T] = {
val inputType = dataSet.getType
Expand All @@ -240,7 +317,7 @@ abstract class BatchTableEnvImpl(
tableOperation
}

private def checkNoTimeAttributes[T](f: Array[Expression]) = {
private def checkNoTimeAttributes[T](f: Array[Expression]): Unit = {
if (f.exists(f =>
f.accept(new ApiExpressionDefaultVisitor[Boolean] {

Expand Down Expand Up @@ -329,4 +406,7 @@ abstract class BatchTableEnvImpl(

TableSchema.builder().fields(originalNames, fieldTypes).build()
}

protected def createDummyBatchTableEnv(): BatchTableEnvImpl

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.util.DummyExecutionEnvironment

import _root_.scala.collection.JavaConverters._

Expand Down Expand Up @@ -127,4 +128,13 @@ class BatchTableEnvironmentImpl(

registerAggregateFunctionInternal[T, ACC](name, f)
}

override protected def createDummyBatchTableEnv(): BatchTableEnvImpl = {
new BatchTableEnvironmentImpl(
new DummyExecutionEnvironment(execEnv),
config,
catalogManager,
moduleManager
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.util.DummyExecutionEnvironment

import _root_.scala.reflect.ClassTag

Expand Down Expand Up @@ -92,5 +93,14 @@ class BatchTableEnvironmentImpl(
fields: Expression*): Unit = {
createTemporaryView(path, fromDataSet(dataSet, fields: _*))
}

override protected def createDummyBatchTableEnv(): BatchTableEnvImpl = {
new BatchTableEnvironmentImpl(
new ExecutionEnvironment(new DummyExecutionEnvironment(execEnv.getJavaEnv)),
config,
catalogManager,
moduleManager
)
}
}

Loading

0 comments on commit c565ec3

Please sign in to comment.