Skip to content

Commit

Permalink
[FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client'…
Browse files Browse the repository at this point in the history
…s execution

This closes apache#20869
  • Loading branch information
lsyldliu committed Sep 27, 2022
1 parent 0f8909c commit 44009ef
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
Expand All @@ -80,7 +81,6 @@
import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.sql.codegen;

import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;

import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/** End to End tests for create table as select syntax. */
public class CreateTableAsITCase extends SqlITCaseBase {

public CreateTableAsITCase(String executionMode) {
super(executionMode);
}

@Test
public void testCreateTableAs() throws Exception {
runAndCheckSQL(
"create_table_as_e2e.sql",
generateReplaceVars(),
2,
Arrays.asList(
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
}

@Test
public void testCreateTableAsInStatementSet() throws Exception {
runAndCheckSQL(
"create_table_as_statementset_e2e.sql",
generateReplaceVars(),
2,
Arrays.asList(
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
}

@Override
protected void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
throws Exception {
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(SQL_TOOL_BOX_JAR)
.build(),
Duration.ofMinutes(2L));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction';

SET execution.runtime-mode = $MODE;
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.size = 5;
SET table.exec.mini-batch.allow-latency = 2s;

CREATE TABLE JsonTable
WITH (
'connector' = 'filesystem',
'path' = '$RESULT',
'sink.rolling-policy.rollover-interval' = '2s',
'sink.rolling-policy.check-interval' = '2s',
'format' = 'debezium-json'
)
AS SELECT user_name, count_agg(order_id) AS order_cnt
FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
GROUP BY user_name;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction';

SET execution.runtime-mode = $MODE;
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.size = 5;
SET table.exec.mini-batch.allow-latency = 2s;

BEGIN STATEMENT SET;

CREATE TABLE JsonTable
WITH (
'connector' = 'filesystem',
'path' = '$RESULT',
'sink.rolling-policy.rollover-interval' = '2s',
'sink.rolling-policy.check-interval' = '2s',
'format' = 'debezium-json'
)
AS SELECT user_name, count_agg(order_id) AS order_cnt
FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
GROUP BY user_name;

END;
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
Expand Down Expand Up @@ -410,7 +411,8 @@ private void validate(Operation operation, ExecutionMode executionMode) {
// check the current operation is allowed in STATEMENT SET.
if (isStatementSetMode) {
if (!(operation instanceof SinkModifyOperation
|| operation instanceof EndStatementSetOperation)) {
|| operation instanceof EndStatementSetOperation
|| operation instanceof CreateTableASOperation)) {
// It's up to invoker of the executeStatement to determine whether to continue
// execution
throw new SqlExecutionException(MESSAGE_STATEMENT_SET_SQL_EXECUTION_ERROR);
Expand Down Expand Up @@ -463,6 +465,9 @@ private void callOperation(Operation operation, ExecutionMode mode) {
} else if (operation instanceof ShowCreateViewOperation) {
// SHOW CREATE VIEW
callShowCreateView((ShowCreateViewOperation) operation);
} else if (operation instanceof CreateTableASOperation) {
// CTAS
callInsert((CreateTableASOperation) operation);
} else {
// fallback to default implementation
executeOperation(operation);
Expand Down Expand Up @@ -557,7 +562,7 @@ private void callSelect(QueryOperation operation) {
}
}

private void callInsert(SinkModifyOperation operation) {
private void callInsert(ModifyOperation operation) {
if (isStatementSetMode) {
statementSetOperations.add(operation);
printInfo(CliStrings.MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET);
Expand Down
21 changes: 14 additions & 7 deletions flink-table/flink-sql-client/src/test/resources/sql/set.q
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,21 @@ CREATE TABLE hive_table (
[INFO] Execute statement succeed.
!info

# test "ctas" only supported in Hive Dialect
# test "ctas" in Hive Dialect
CREATE TABLE foo as select 1;
+-------------------------+
| hivecatalog.default.foo |
+-------------------------+
| -1 |
+-------------------------+
1 row in set
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID:

!info

SELECT * from foo;
+----+-------------+
| op | _o__c0 |
+----+-------------+
| +I | 1 |
+----+-------------+
Received a total of 1 row
!ok

# test add jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public void before(@TempDir Path temporaryFolder) throws Exception {
replaceVars.put(
"$VAR_BATCH_PATH",
Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
replaceVars.put(
"$VAR_BATCH_CTAS_PATH",
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
}

@ParameterizedTest
Expand Down
53 changes: 53 additions & 0 deletions flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,56 @@ SELECT * FROM BatchTable;
+----+-------------+
7 rows in set
!ok

# test only to verify the test job id.
SET '$internal.pipeline.job-id' = '84c7408a08c284d5736e50d3f5a648be';
!output
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
!ok

CREATE TABLE CtasTable
WITH (
'connector' = 'filesystem',
'path' = '$VAR_BATCH_CTAS_PATH',
'format' = 'csv'
)
AS SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')) T(id, str);
!output
+----------------------------------+
| job id |
+----------------------------------+
| 84c7408a08c284d5736e50d3f5a648be |
+----------------------------------+
1 row in set
!ok

RESET '$internal.pipeline.job-id';
!output
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
!ok

SELECT * FROM CtasTable;
!output
+----+-------------+
| id | str |
+----+-------------+
| 1 | Hello World |
| 2 | Hi |
| 2 | Hi |
| 3 | Hello |
| 3 | World |
| 4 | ADD |
| 5 | LINE |
+----+-------------+
7 rows in set
!ok
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
Expand Down Expand Up @@ -139,7 +140,6 @@
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
Expand Down Expand Up @@ -779,7 +779,9 @@ public CompiledPlan loadPlan(PlanReference planReference) {
public CompiledPlan compilePlanSql(String stmt) {
List<Operation> operations = getParser().parse(stmt);

if (operations.size() != 1 || !(operations.get(0) instanceof ModifyOperation)) {
if (operations.size() != 1
|| !(operations.get(0) instanceof ModifyOperation)
|| operations.get(0) instanceof CreateTableASOperation) {
throw new TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG);
}

Expand Down Expand Up @@ -839,8 +841,20 @@ public CompiledPlan compilePlan(List<ModifyOperation> operations) {

@Override
public TableResultInternal executeInternal(List<ModifyOperation> operations) {
List<Transformation<?>> transformations = translate(operations);
List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
List<ModifyOperation> mapOperations = new ArrayList<>();
for (ModifyOperation modify : operations) {
// execute CREATE TABLE first for CTAS statements
if (modify instanceof CreateTableASOperation) {
CreateTableASOperation ctasOperation = (CreateTableASOperation) modify;
executeInternal(ctasOperation.getCreateTableOperation());
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
} else {
mapOperations.add(modify);
}
}

List<Transformation<?>> transformations = translate(mapOperations);
List<String> sinkIdentifierNames = extractSinkIdentifierNames(mapOperations);
TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);
if (tableConfig.get(TABLE_DML_SYNC)) {
try {
Expand Down Expand Up @@ -1397,10 +1411,6 @@ public TableResultInternal executeInternal(Operation operation) {
}
} else if (operation instanceof QueryOperation) {
return executeQueryOperation((QueryOperation) operation);
} else if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
executeInternal(createTableASOperation.getCreateTableOperation());
return executeInternal(createTableASOperation.toSinkModifyOperation(catalogManager));
} else if (operation instanceof ExecutePlanOperation) {
ExecutePlanOperation executePlanOperation = (ExecutePlanOperation) operation;
return (TableResultInternal)
Expand Down
Loading

0 comments on commit 44009ef

Please sign in to comment.