Skip to content

Commit

Permalink
[FLINK-21664][sql-parser] Support to parse STATEMENT SET syntax
Browse files Browse the repository at this point in the history
This closes apache#15282
  • Loading branch information
SteNicholas committed Mar 28, 2021
1 parent e19d383 commit b19538d
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 1 deletion.
4 changes: 4 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
"org.apache.flink.sql.parser.ddl.SqlWatermark"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dml.SqlBeginStatementSet"
"org.apache.flink.sql.parser.dml.SqlEndStatementSet"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
Expand Down Expand Up @@ -461,6 +463,8 @@
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"RichSqlInsert()"
"SqlBeginStatementSet()"
"SqlEndStatementSet()"
"SqlLoadModule()"
"SqlShowCatalogs()"
"SqlShowCurrentCatalogOrDatabase()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,3 +1541,31 @@ SqlShowModules SqlShowModules() :
return new SqlShowModules(startPos.plus(getPos()), requireFull);
}
}

/**
* Parse a start statement set statement.
* BEGIN STATEMENT SET;
*/
SqlBeginStatementSet SqlBeginStatementSet() :
{
}
{
<BEGIN> <STATEMENT> <SET>
{
return new SqlBeginStatementSet(getPos());
}
}

/**
* Parse a end statement set statement.
* END;
*/
SqlEndStatementSet SqlEndStatementSet() :
{
}
{
<END>
{
return new SqlEndStatementSet(getPos());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dml;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** BEGIN STATEMENT SET. */
public class SqlBeginStatementSet extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("BEGIN STATEMENT SET", SqlKind.OTHER);

public SqlBeginStatementSet(SqlParserPos pos) {
super(pos);
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("BEGIN STATEMENT SET");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dml;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** END. */
public class SqlEndStatementSet extends SqlCall {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("END", SqlKind.OTHER);

public SqlEndStatementSet(SqlParserPos pos) {
super(pos);
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("END");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,16 @@ public void testShowModules() {
sql("show full modules").ok("SHOW FULL MODULES");
}

@Test
public void testBeginStatementSet() {
sql("begin statement set").ok("BEGIN STATEMENT SET");
}

@Test
public void testEnd() {
sql("end").ok("END");
}

public static BaseMatcher<SqlNode> validated(String validatedSql) {
return new TypeSafeDiagnosingMatcher<SqlNode>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

/** Operation to describe a BEGIN STATEMENT SET statement. */
public class BeginStatementSetOperation implements Operation {

@Override
public String asSummaryString() {
return "BEGIN STATEMENT SET";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

/** Operation to describe a End statement. */
public class EndStatementSetOperation implements Operation {

@Override
public String asSummaryString() {
return "END";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.flink.sql.parser.ddl.SqlUseModules;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.sql.parser.dml.SqlBeginStatementSet;
import org.apache.flink.sql.parser.dml.SqlEndStatementSet;
import org.apache.flink.sql.parser.dql.SqlLoadModule;
import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
Expand Down Expand Up @@ -83,8 +85,10 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
Expand Down Expand Up @@ -264,6 +268,11 @@ public static Optional<Operation> convert(
return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlBeginStatementSet) {
return Optional.of(
converter.convertBeginStatementSet((SqlBeginStatementSet) validated));
} else if (validated instanceof SqlEndStatementSet) {
return Optional.of(converter.convertEndStatementSet((SqlEndStatementSet) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
Expand Down Expand Up @@ -595,6 +604,16 @@ private Operation convertSqlInsert(RichSqlInsert insert) {
dynamicOptions);
}

/** Convert BEGIN STATEMENT SET statement. */
private Operation convertBeginStatementSet(SqlBeginStatementSet sqlBeginStatementSet) {
return new BeginStatementSetOperation();
}

/** Convert END statement. */
private Operation convertEndStatementSet(SqlEndStatementSet sqlEndStatementSet) {
return new EndStatementSetOperation();
}

/** Convert use catalog statement. */
private Operation convertUseCatalog(SqlUseCatalog useCatalog) {
return new UseCatalogOperation(useCatalog.catalogName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.dml.{SqlBeginStatementSet, SqlEndStatementSet}
import org.apache.flink.sql.parser.dql._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
Expand Down Expand Up @@ -135,7 +136,9 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]) {
|| sqlNode.isInstanceOf[SqlUseModules]
|| sqlNode.isInstanceOf[SqlBeginStatementSet]
|| sqlNode.isInstanceOf[SqlEndStatementSet]) {
return sqlNode
}
sqlNode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
Expand Down Expand Up @@ -1383,6 +1385,28 @@ public void testCreateViewWithDynamicTableOptions() {
assertThat(operation, instanceOf(CreateViewOperation.class));
}

@Test
public void testBeginStatementSet() {
final String sql = "BEGIN STATEMENT SET";
Operation operation = parse(sql, SqlDialect.DEFAULT);
assert operation instanceof BeginStatementSetOperation;
final BeginStatementSetOperation beginStatementSetOperation =
(BeginStatementSetOperation) operation;

assertEquals("BEGIN STATEMENT SET", beginStatementSetOperation.asSummaryString());
}

@Test
public void testEnd() {
final String sql = "END";
Operation operation = parse(sql, SqlDialect.DEFAULT);
assert operation instanceof EndStatementSetOperation;
final EndStatementSetOperation endStatementSetOperation =
(EndStatementSetOperation) operation;

assertEquals("END", endStatementSetOperation.asSummaryString());
}

// ~ Tool Methods ----------------------------------------------------------

private static TestItem createTestItem(Object... args) {
Expand Down

0 comments on commit b19538d

Please sign in to comment.