diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index e2e036f0445c5..536f7bcfefc0d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; import org.apache.flink.table.planner.delegation.ParserImpl; @@ -39,6 +38,7 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParseUtils; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; +import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.flink.util.FileUtils; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index b32e2705bb0ad..9268e4c845d66 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -32,6 +32,11 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.command.ClearOperation; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.QuitOperation; +import org.apache.flink.table.operations.command.ResetOperation; +import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.planner.delegation.hive.HiveParser; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; @@ -66,11 +71,13 @@ import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** Test Hive syntax when Hive dialect is used. */ @@ -120,6 +127,20 @@ public void testPluggableParser() { parser.getClass().getName(), tableEnvInternal.getParser().getClass().getName()); } + @Test + public void testParseCommand() { + TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv; + Parser parser = tableEnvInternal.getParser(); + + // hive dialect should use HiveParser + assertTrue(parser instanceof HiveParser); + assertThat(parser.parse("HELP").get(0), instanceOf(HelpOperation.class)); + assertThat(parser.parse("clear").get(0), instanceOf(ClearOperation.class)); + assertThat(parser.parse("SET").get(0), instanceOf(SetOperation.class)); + assertThat(parser.parse("ResET").get(0), instanceOf(ResetOperation.class)); + assertThat(parser.parse("Exit").get(0), instanceOf(QuitOperation.class)); + } + @Test public void testCreateDatabase() throws Exception { tableEnv.executeSql("create database db1 comment 'db1 comment'"); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 426e44fc4f4a4..5e4d4d450ea5a 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -186,17 +186,26 @@ public ResolvedExpression parseSqlExpression( return context.wrapClassLoader( () -> parser.parseSqlExpression(sqlExpression, inputSchema)); } + + @Override + public String[] getCompletionHints(String statement, int position) { + return context.wrapClassLoader( + () -> parser.getCompletionHints(statement, position)); + } }; } @Override public List completeStatement(String sessionId, String statement, int position) { final ExecutionContext context = getExecutionContext(sessionId); - final TableEnvironment tableEnv = context.getTableEnvironment(); + final TableEnvironmentInternal tableEnv = + (TableEnvironmentInternal) context.getTableEnvironment(); try { return context.wrapClassLoader( - () -> Arrays.asList(tableEnv.getCompletionHints(statement, position))); + () -> + Arrays.asList( + tableEnv.getParser().getCompletionHints(statement, position))); } catch (Throwable t) { // catch everything such that the query does not crash the executor if (LOG.isDebugEnabled()) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index cc7b369bc22af..193eac3bfb155 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -677,7 +677,7 @@ public String explainInternal(List operations, ExplainDetail... extra @Override public String[] getCompletionHints(String statement, int position) { - return planner.getCompletionHints(statement, position); + return planner.getParser().getCompletionHints(statement, position); } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java index 67cc30e39b7bd..20f07e11bd940 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java @@ -63,4 +63,14 @@ public interface Parser { * @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression */ ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema); + + /** + * Returns completion hints for the given statement at the given cursor position. The completion + * happens case insensitively. + * + * @param statement Partial or slightly incorrect SQL statement + * @param position cursor position + * @return completion hints that fit at the current cursor position + */ + String[] getCompletionHints(String statement, int position); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index 115edb9226db6..5dfd45be2acef 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -86,16 +86,6 @@ public interface Planner { */ String explain(List operations, ExplainDetail... extraDetails); - /** - * Returns completion hints for the given statement at the given cursor position. The completion - * happens case insensitively. - * - * @param statement Partial or slightly incorrect SQL statement - * @param position cursor position - * @return completion hints that fit at the current cursor position - */ - String[] getCompletionHints(String statement, int position); - /** * Get the json plan of the given {@link ModifyOperation}s. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ClearOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ClearOperation.java new file mode 100644 index 0000000000000..f7a855e85a1b9 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ClearOperation.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation to represent CLEAR command. */ +public class ClearOperation implements Operation { + @Override + public String asSummaryString() { + return "CLEAR"; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/HelpOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/HelpOperation.java new file mode 100644 index 0000000000000..3e6a14645094f --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/HelpOperation.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation that represents HELP command. */ +public class HelpOperation implements Operation { + + @Override + public String asSummaryString() { + return "HELP"; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/QuitOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/QuitOperation.java new file mode 100644 index 0000000000000..250de6db6f0a2 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/QuitOperation.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation that represent QUIT command. */ +public class QuitOperation implements Operation { + + @Override + public String asSummaryString() { + return "QUIT"; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java new file mode 100644 index 0000000000000..8a33b57483b9b --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ResetOperation.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation that represent RESET command. */ +public class ResetOperation implements Operation { + @Override + public String asSummaryString() { + return "RESET"; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/SetOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/SetOperation.java new file mode 100644 index 0000000000000..f8d1bc1f899b6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/SetOperation.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation to represent SET command. */ +public class SetOperation implements Operation { + + private final String[] operands; + + public SetOperation(String[] operands) { + this.operands = operands; + } + + public String[] getOperands() { + return operands; + } + + @Override + public String asSummaryString() { + if (operands.length == 0) { + return "SET"; + } else { + return String.format("SET %s=%s", operands[0], operands[1]); + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java index 4cfe4f30f943b..c56ee9ccb500e 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java @@ -42,4 +42,9 @@ public UnresolvedIdentifier parseIdentifier(String identifier) { public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) { return null; } + + @Override + public String[] getCompletionHints(String statement, int position) { + throw new UnsupportedOperationException("Unsupported operation."); + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index 409eec9b687ac..cf56552a852bc 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -45,11 +45,6 @@ public String explain(List operations, ExplainDetail... extraDetails) return null; } - @Override - public String[] getCompletionHints(String statement, int position) { - return new String[0]; - } - @Override public String getJsonPlan(List modifyOperations) { return null; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java index e55f43b7fa18a..c0fa221dde0b6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/SqlExprToRexConverterImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.calcite; +import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.calcite.config.CalciteConnectionConfigImpl; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java index 0e3501c19ef9c..e9136a5f5f156 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java @@ -25,23 +25,30 @@ import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.SqlExprToRexConverter; import org.apache.flink.table.planner.expressions.RexNodeExpression; import org.apache.flink.table.planner.operations.SqlToOperationConverter; +import org.apache.flink.table.planner.parse.CalciteParser; +import org.apache.flink.table.planner.parse.ExtendedParser; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.advise.SqlAdvisor; +import org.apache.calcite.sql.advise.SqlAdvisorValidator; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; /** Implementation of {@link Parser} that uses Calcite. */ public class ParserImpl implements Parser { @@ -54,6 +61,7 @@ public class ParserImpl implements Parser { private final Supplier validatorSupplier; private final Supplier calciteParserSupplier; private final Function sqlExprToRexConverterCreator; + private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE; public ParserImpl( CatalogManager catalogManager, @@ -66,10 +74,24 @@ public ParserImpl( this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator; } + /** + * When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link + * ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse + * statements. + * + * @param statement input statement. + * @return parsed operations. + */ @Override public List parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); + + Optional command = EXTENDED_PARSER.parse(statement); + if (command.isPresent()) { + return Collections.singletonList(command.get()); + } + // parse the sql query SqlNode parsed = parser.parse(statement); @@ -101,6 +123,27 @@ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema i sqlExpressionExpanded); } + public String[] getCompletionHints(String statement, int cursor) { + List candidates = + new ArrayList<>( + Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor))); + + // use sql advisor + SqlAdvisorValidator validator = validatorSupplier.get().getSqlAdvisorValidator(); + SqlAdvisor advisor = + new SqlAdvisor(validator, validatorSupplier.get().config().getParserConfig()); + String[] replaced = new String[1]; + + List sqlHints = + advisor.getCompletionHints(statement, cursor, replaced).stream() + .map(item -> item.toIdentifier().toString()) + .collect(Collectors.toList()); + + candidates.addAll(sqlHints); + + return candidates.toArray(new String[0]); + } + public CatalogManager getCatalogManager() { return catalogManager; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 2b4bce781aaa6..5fea4d5805907 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.planner.calcite.CalciteConfig; import org.apache.flink.table.planner.calcite.CalciteConfig$; -import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; @@ -43,6 +42,7 @@ import org.apache.flink.table.planner.codegen.ExpressionReducer; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.hint.FlinkHintStrategies; +import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.flink.table.planner.plan.cost.FlinkCostFactory; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/AbstractRegexParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/AbstractRegexParseStrategy.java new file mode 100644 index 0000000000000..aed4e9bee706b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/AbstractRegexParseStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link Operation} by regex. */ +public abstract class AbstractRegexParseStrategy implements ExtendedParseStrategy { + + protected static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; + + protected Pattern pattern; + + protected AbstractRegexParseStrategy(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public boolean match(String statement) { + return pattern.matcher(statement.trim()).matches(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/CalciteParser.java similarity index 99% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/CalciteParser.java index 9c98ffa16aeb6..7ec4e04cd4d4e 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/CalciteParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.calcite; +package org.apache.flink.table.planner.parse; import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ClearOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ClearOperationParseStrategy.java new file mode 100644 index 0000000000000..5e4028f68eb37 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ClearOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.ClearOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link ClearOperation}. */ +public class ClearOperationParseStrategy extends AbstractRegexParseStrategy { + + static final ClearOperationParseStrategy INSTANCE = new ClearOperationParseStrategy(); + + private ClearOperationParseStrategy() { + super(Pattern.compile("CLEAR", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new ClearOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"CLEAR"}; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParseStrategy.java new file mode 100644 index 0000000000000..d4c45ca6d163f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParseStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; + +/** + * Strategy to parse statement to {@link Operation}. parsing some special command which can't + * supported by {@link CalciteParser}, e.g. {@code SET key=value} may contain special characters in + * key and value. + */ +public interface ExtendedParseStrategy { + + /** Determine whether the input statement is satisfied the strategy. */ + boolean match(String statement); + + /** Convert the input statement to the {@link Operation}. */ + Operation convert(String statement); + + /** Return hints for the given statement. */ + String[] getHints(); +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParser.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParser.java new file mode 100644 index 0000000000000..7880580106d06 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ExtendedParser.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * {@link ExtendedParser} is used for parsing some special command which can't supported by {@link + * CalciteParser}, e.g. {@code SET key=value} may contain special characters in key and value. It's + * also a good idea to move some parsing strategy here to avoid introducing new reserved keywords. + */ +public class ExtendedParser { + + public static final ExtendedParser INSTANCE = new ExtendedParser(); + + private static final List PARSE_STRATEGIES = + Arrays.asList( + ClearOperationParseStrategy.INSTANCE, + HelpOperationParseStrategy.INSTANCE, + QuitOperationParseStrategy.INSTANCE, + ResetOperationParseStrategy.INSTANCE, + SetOperationParseStrategy.INSTANCE); + + /** + * Parse the input statement to the {@link Operation}. + * + * @param statement the command to evaluate + * @return parsed operation that represents the command + */ + public Optional parse(String statement) { + for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) { + if (strategy.match(statement)) { + return Optional.of(strategy.convert(statement)); + } + } + return Optional.empty(); + } + + /** + * Returns completion hints for the given statement at the given cursor position. The completion + * happens case insensitively. + * + * @param statement Partial or slightly incorrect SQL statement + * @param cursor cursor position + * @return completion hints that fit at the current cursor position + */ + public String[] getCompletionHints(String statement, int cursor) { + String normalizedStatement = statement.trim().toUpperCase(); + List hints = new ArrayList<>(); + for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) { + for (String hint : strategy.getHints()) { + if (hint.startsWith(normalizedStatement) && cursor < hint.length()) { + hints.add(getCompletionHint(normalizedStatement, hint)); + } + } + } + + return hints.toArray(new String[0]); + } + + private String getCompletionHint(String statement, String commandHint) { + if (statement.length() == 0) { + return commandHint; + } + int cursorPos = statement.length() - 1; + int returnStartPos; + if (Character.isWhitespace(commandHint.charAt(cursorPos + 1))) { + returnStartPos = Math.min(commandHint.length() - 1, cursorPos + 2); + } else { + // 'add ja' should return 'jar' + returnStartPos = cursorPos; + while (returnStartPos > 0 + && !Character.isWhitespace(commandHint.charAt(returnStartPos - 1))) { + returnStartPos--; + } + } + + return commandHint.substring(returnStartPos); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/HelpOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/HelpOperationParseStrategy.java new file mode 100644 index 0000000000000..a5458ac832eb2 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/HelpOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.HelpOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link HelpOperation}. */ +public class HelpOperationParseStrategy extends AbstractRegexParseStrategy { + + static final HelpOperationParseStrategy INSTANCE = new HelpOperationParseStrategy(); + + private HelpOperationParseStrategy() { + super(Pattern.compile("HELP", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new HelpOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"HELP"}; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/QuitOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/QuitOperationParseStrategy.java new file mode 100644 index 0000000000000..38d006ba61e56 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/QuitOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.QuitOperation; + +import java.util.regex.Pattern; + +/** Operation to parse statement to {@link QuitOperation}. */ +public class QuitOperationParseStrategy extends AbstractRegexParseStrategy { + + static final QuitOperationParseStrategy INSTANCE = new QuitOperationParseStrategy(); + + private QuitOperationParseStrategy() { + super(Pattern.compile("EXIT|QUIT", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new QuitOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"EXIT", "QUIT"}; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java new file mode 100644 index 0000000000000..8a7e7519e1059 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.ResetOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link ResetOperation}. */ +public class ResetOperationParseStrategy extends AbstractRegexParseStrategy { + + static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy(); + + private ResetOperationParseStrategy() { + super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new ResetOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"RESET"}; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java new file mode 100644 index 0000000000000..8e5a17fd391f7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.parse; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.SetOperation; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link SetOperation}. */ +public class SetOperationParseStrategy extends AbstractRegexParseStrategy { + + static final SetOperationParseStrategy INSTANCE = new SetOperationParseStrategy(); + + protected SetOperationParseStrategy() { + super(Pattern.compile("SET(\\s+(\\S+)\\s*=(.+))?", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + Matcher matcher = pattern.matcher(statement.trim()); + final List operands = new ArrayList<>(); + if (matcher.find()) { + for (int i = 0; i < matcher.groupCount(); i++) { + if (matcher.group(i + 1) != null) { + operands.add(matcher.group(i + 1)); + } + } + } + + // only capture SET + if (operands.isEmpty()) { + return new SetOperation(new String[0]); + } else if (operands.size() == 3) { + return new SetOperation( + operands.subList(1, 3).stream().map(String::trim).toArray(String[]::new)); + } else { + // impossible + throw new TableException( + String.format( + "Failed to convert the statement to SET operation: %s.", statement)); + } + } + + @Override + public String[] getHints() { + return new String[] {"SET"}; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 0570314aeae83..b22b73f2ff9f2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ExtendedSqlNode import org.apache.flink.sql.parser.dql._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader + import com.google.common.collect.ImmutableList import org.apache.calcite.config.NullCollation import org.apache.calcite.plan._ @@ -36,11 +37,12 @@ import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import org.apache.flink.sql.parser.ddl.SqlUseModules +import org.apache.flink.table.planner.parse.CalciteParser import java.lang.{Boolean => JBoolean} import java.util import java.util.function.{Function => JFunction} -import scala.collection.JavaConversions._ + import scala.collection.JavaConverters._ /** @@ -62,18 +64,13 @@ class FlinkPlannerImpl( var validator: FlinkCalciteSqlValidator = _ - def getCompletionHints(sql: String, cursor: Int): Array[String] = { - val advisorValidator = new SqlAdvisorValidator( + def getSqlAdvisorValidator(): SqlAdvisorValidator = { + new SqlAdvisorValidator( operatorTable, catalogReaderSupplier.apply(true), // ignore cases for lenient completion typeFactory, SqlValidator.Config.DEFAULT .withSqlConformance(config.getParserConfig.conformance())) - val advisor = new SqlAdvisor(advisorValidator, config.getParserConfig) - val replaced = Array[String](null) - val hints = advisor.getCompletionHints(sql, cursor, replaced) - .map(item => item.toIdentifier.toString) - hints.toArray } /** diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 6a5717c5fa51b..1b69b2891177e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -173,11 +173,6 @@ abstract class PlannerBase( } } - override def getCompletionHints(statement: String, position: Int): Array[String] = { - val planner = createFlinkPlanner - planner.getCompletionHints(statement, position) - } - /** * Converts a relational tree of [[ModifyOperation]] into a Calcite relational expression. */ diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java new file mode 100644 index 0000000000000..1b85723eccd43 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +/** Test for {@link ParserImpl}. */ +public class ParserImplTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + private final boolean isStreamingMode = false; + private final TableConfig tableConfig = new TableConfig(); + private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); + private final CatalogManager catalogManager = + CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build(); + private final ModuleManager moduleManager = new ModuleManager(); + private final FunctionCatalog functionCatalog = + new FunctionCatalog(tableConfig, catalogManager, moduleManager); + private final PlannerContext plannerContext = + new PlannerContext( + tableConfig, + functionCatalog, + catalogManager, + asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), + new ArrayList<>()); + + private final Supplier plannerSupplier = + () -> + plannerContext.createFlinkPlanner( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase()); + + private final Parser parser = + new ParserImpl( + catalogManager, + plannerSupplier, + () -> plannerSupplier.get().parser(), + t -> + plannerContext.createSqlExprToRexConverter( + plannerContext.getTypeFactory().buildRelNodeRowType(t))); + + private List testLegalStatements; + private List testIllegalStatements; + + @Before + public void setup() { + testLegalStatements = + Arrays.asList( + TestSpec.forStatement("ClEaR").expectedSummary("CLEAR"), + TestSpec.forStatement("hElP").expectedSummary("HELP"), + TestSpec.forStatement("qUIT").expectedSummary("QUIT"), + TestSpec.forStatement("ExIT").expectedSummary("EXIT"), + TestSpec.forStatement("REsEt").expectedSummary("RESET"), + TestSpec.forStatement(" SEt ").expectedSummary("SET"), + TestSpec.forStatement("SET execution.runtime-type=batch") + .expectedSummary("SET execution.runtime-type=batch"), + TestSpec.forStatement("SET pipeline.jars = /path/to/test-_-jar.jar") + .expectedSummary("SET pipeline.jars=/path/to/test-_-jar.jar"), + TestSpec.forStatement("USE test.db1").expectedSummary("USE test.db1"), + TestSpec.forStatement("SHOW tables").expectedSummary("SHOW TABLES"), + TestSpec.forStatement("SET pipeline.name = ' '") + .expectedSummary("SET pipeline.name = ' '")); + + testIllegalStatements = + Collections.singletonList(TestSpec.forStatement("SET execution.runtime-type=")); + } + + @Test + public void testParseLegalStatements() { + for (TestSpec spec : testLegalStatements) { + Operation op = parser.parse(spec.statement).get(0); + Assert.assertEquals(op.asSummaryString(), op.asSummaryString()); + } + } + + @Test + public void testParseIllegalStatements() { + thrown.expect(SqlParserException.class); + for (TestSpec spec : testIllegalStatements) { + parser.parse(spec.statement); + fail("Should fail."); + } + } + + @Test + public void testCompletionTest() { + verifySqlCompletion("QU", 1, new String[] {"QUIT"}); + verifySqlCompletion("SE", 1, new String[] {"SET"}); + verifySqlCompletion("", 0, new String[] {"CLEAR", "HELP", "EXIT", "QUIT", "RESET", "SET"}); + verifySqlCompletion("SELECT a fram b", 10, new String[] {"FETCH", "FROM"}); + } + + // ~ Tool Methods ---------------------------------------------------------- + + private static class TestSpec { + + private String statement; + private String expectedSummary; + + private TestSpec(String statement) { + this.statement = statement; + } + + static TestSpec forStatement(String statement) { + return new TestSpec(statement); + } + + TestSpec expectedSummary(String expectedSummary) { + this.expectedSummary = expectedSummary; + return this; + } + } + + private void verifySqlCompletion(String statement, int position, String[] expectedHints) { + String[] hints = parser.getCompletionHints(statement, position); + assertArrayEquals(expectedHints, hints); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index cc961fd3cf9ca..b0cc45b3ce88d 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -64,7 +64,6 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.CreateViewOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; -import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; import org.apache.flink.table.planner.delegation.ParserImpl; @@ -72,6 +71,7 @@ import org.apache.flink.table.planner.expressions.utils.Func0$; import org.apache.flink.table.planner.expressions.utils.Func1$; import org.apache.flink.table.planner.expressions.utils.Func8$; +import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.CatalogManagerMocks; diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala index 5cd9e602a6a41..90c5010e8525f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala @@ -27,9 +27,10 @@ import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectId import org.apache.flink.table.data.{GenericRowData, TimestampData} import org.apache.flink.table.delegation.Parser import org.apache.flink.table.module.ModuleManager -import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory} +import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory} import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema import org.apache.flink.table.planner.delegation.{ParserImpl, PlannerContext} +import org.apache.flink.table.planner.parse.CalciteParser import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 import org.apache.flink.table.runtime.generated.WatermarkGenerator import org.apache.flink.table.types.logical.{IntType, TimestampType} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/AbstractRegexParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/AbstractRegexParseStrategy.java new file mode 100644 index 0000000000000..a0b741c98d0da --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/AbstractRegexParseStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link Operation} by regex. */ +public abstract class AbstractRegexParseStrategy implements ExtendedParseStrategy { + + protected static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; + + protected Pattern pattern; + + protected AbstractRegexParseStrategy(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public boolean match(String statement) { + return pattern.matcher(statement.trim()).matches(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/calcite/CalciteParser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/CalciteParser.java similarity index 98% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/calcite/CalciteParser.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/CalciteParser.java index 93c80a9730e8d..b3344d90cc952 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/calcite/CalciteParser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/CalciteParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.calcite; +package org.apache.flink.table.parse; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.flink.table.api.SqlParserException; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ClearOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ClearOperationParseStrategy.java new file mode 100644 index 0000000000000..46d488c49bdfb --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ClearOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.ClearOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link ClearOperation}. */ +public class ClearOperationParseStrategy extends AbstractRegexParseStrategy { + + static final ClearOperationParseStrategy INSTANCE = new ClearOperationParseStrategy(); + + private ClearOperationParseStrategy() { + super(Pattern.compile("CLEAR", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new ClearOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"CLEAR"}; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParseStrategy.java new file mode 100644 index 0000000000000..ab989dc8a2473 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParseStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; + +/** + * Strategy to parse statement to {@link Operation}. parsing some special command which can't + * supported by {@link CalciteParser}, e.g. {@code SET key=value} may contain special characters in + * key and value. + */ +public interface ExtendedParseStrategy { + + /** Determine whether the input statement is satisfied the strategy. */ + boolean match(String statement); + + /** Convert the input statement to the {@link Operation}. */ + Operation convert(String statement); + + /** Return hints for the given statement. */ + String[] getHints(); +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java new file mode 100644 index 0000000000000..a1c98f1e4a4fd --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * {@link ExtendedParser} is used for parsing some special command which can't supported by {@link + * CalciteParser}, e.g. {@code SET key=value} contains special characters in key and value + * identifier. It's also good to move some parsring here to avoid introducing new reserved keywords. + */ +public class ExtendedParser { + + public static final ExtendedParser INSTANCE = new ExtendedParser(); + + private static final List PARSE_STRATEGIES = + Arrays.asList( + ClearOperationParseStrategy.INSTANCE, + HelpOperationParseStrategy.INSTANCE, + QuitOperationParseStrategy.INSTANCE, + ResetOperationParseStrategy.INSTANCE, + SetOperationParseStrategy.INSTANCE); + + /** + * Parse the input statement to the {@link Operation}. + * + * @param statement the command to evaluate + * @return parsed operation that represents the command + */ + public Optional parse(String statement) { + for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) { + if (strategy.match(statement)) { + return Optional.of(strategy.convert(statement)); + } + } + return Optional.empty(); + } + + /** + * Returns completion hints for the given statement at the given cursor position. The completion + * happens case insensitively. + * + * @param statement Partial or slightly incorrect SQL statement + * @param cursor cursor position + * @return completion hints that fit at the current cursor position + */ + public String[] getCompletionHints(String statement, int cursor) { + String normalizedStatement = statement.trim().toUpperCase(); + List hints = new ArrayList<>(); + for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) { + for (String hint : strategy.getHints()) { + if (hint.startsWith(normalizedStatement) && cursor < hint.length()) { + hints.add(getCompletionHint(normalizedStatement, hint)); + } + } + } + return hints.toArray(new String[0]); + } + + private String getCompletionHint(String statement, String commandHint) { + if (statement.length() == 0) { + return commandHint; + } + int cursorPos = statement.length() - 1; + int returnStartPos; + if (Character.isWhitespace(commandHint.charAt(cursorPos + 1))) { + returnStartPos = Math.min(commandHint.length() - 1, cursorPos + 2); + } else { + // 'add ja' should return 'jar' + returnStartPos = cursorPos; + while (returnStartPos > 0 + && !Character.isWhitespace(commandHint.charAt(returnStartPos - 1))) { + returnStartPos--; + } + } + + return commandHint.substring(returnStartPos); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/HelpOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/HelpOperationParseStrategy.java new file mode 100644 index 0000000000000..bd13bab04000d --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/HelpOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.HelpOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link HelpOperation}. */ +public class HelpOperationParseStrategy extends AbstractRegexParseStrategy { + + static final HelpOperationParseStrategy INSTANCE = new HelpOperationParseStrategy(); + + private HelpOperationParseStrategy() { + super(Pattern.compile("HELP", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new HelpOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"HELP"}; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/QuitOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/QuitOperationParseStrategy.java new file mode 100644 index 0000000000000..4ed5a952af897 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/QuitOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.QuitOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link QuitOperation}. */ +public class QuitOperationParseStrategy extends AbstractRegexParseStrategy { + + static final QuitOperationParseStrategy INSTANCE = new QuitOperationParseStrategy(); + + private QuitOperationParseStrategy() { + super(Pattern.compile("EXIT|QUIT", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new QuitOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"EXIT", "QUIT"}; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java new file mode 100644 index 0000000000000..3602a4b23066c --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ResetOperationParseStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.ResetOperation; + +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link ResetOperation}. */ +public class ResetOperationParseStrategy extends AbstractRegexParseStrategy { + + static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy(); + + private ResetOperationParseStrategy() { + super(Pattern.compile("RESET", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + return new ResetOperation(); + } + + @Override + public String[] getHints() { + return new String[] {"RESET"}; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/SetOperationParseStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/SetOperationParseStrategy.java new file mode 100644 index 0000000000000..97ff3f1bfd7b9 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/SetOperationParseStrategy.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.parse; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.SetOperation; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Strategy to parse statement to {@link SetOperation}. */ +public class SetOperationParseStrategy extends AbstractRegexParseStrategy { + + static final SetOperationParseStrategy INSTANCE = new SetOperationParseStrategy(); + + protected SetOperationParseStrategy() { + super(Pattern.compile("SET(\\s+(\\S+)\\s*=(.+))?", DEFAULT_PATTERN_FLAGS)); + } + + @Override + public Operation convert(String statement) { + Matcher matcher = pattern.matcher(statement.trim()); + final List operands = new ArrayList<>(); + if (matcher.find()) { + for (int i = 0; i < matcher.groupCount(); i++) { + if (matcher.group(i + 1) != null) { + operands.add(matcher.group(i + 1)); + } + } + } + + // only capture SET + if (operands.isEmpty()) { + return new SetOperation(new String[0]); + } else if (operands.size() == 3) { + return new SetOperation( + operands.subList(1, 3).stream().map(String::trim).toArray(String[]::new)); + } else { + // impossible + throw new TableException( + String.format( + "Failed to convert the statement to SET operation: %s.", statement)); + } + } + + @Override + public String[] getHints() { + return new String[] {"SET"}; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java index 89299df710642..2c2c4ef4c486f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java @@ -20,21 +20,28 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.calcite.CalciteParser; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.parse.CalciteParser; +import org.apache.flink.table.parse.ExtendedParser; import org.apache.flink.table.sqlexec.SqlToOperationConverter; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.advise.SqlAdvisor; +import org.apache.calcite.sql.advise.SqlAdvisorValidator; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; +import java.util.stream.Collectors; /** Implementation of {@link Parser} that uses Calcite. */ public class ParserImpl implements Parser { @@ -46,6 +53,7 @@ public class ParserImpl implements Parser { // multiple statements parsing private final Supplier validatorSupplier; private final Supplier calciteParserSupplier; + private static final ExtendedParser EXTENDED_PARSER = ExtendedParser.INSTANCE; public ParserImpl( CatalogManager catalogManager, @@ -56,11 +64,25 @@ public ParserImpl( this.calciteParserSupplier = calciteParserSupplier; } + /** + * When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link + * ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse + * statements. + * + * @param statement input statement. + * @return parsed operations. + */ @Override public List parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); // parse the sql query + + Optional command = EXTENDED_PARSER.parse(statement); + if (command.isPresent()) { + return Collections.singletonList(command.get()); + } + SqlNode parsed = parser.parse(statement); Operation operation = @@ -87,4 +109,25 @@ public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema i throw new UnsupportedOperationException( "Computed columns is only supported by the Blink planner."); } + + @Override + public String[] getCompletionHints(String statement, int position) { + List candidates = + new ArrayList<>( + Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, position))); + + SqlAdvisorValidator validator = validatorSupplier.get().getSqlAdvisorValidator(); + SqlAdvisor advisor = + new SqlAdvisor(validator, validatorSupplier.get().config().getParserConfig()); + String[] replaced = new String[1]; + + List sqlHints = + advisor.getCompletionHints(statement, position, replaced).stream() + .map(item -> item.toIdentifier().toString()) + .collect(Collectors.toList()); + + candidates.addAll(sqlHints); + + return candidates.toArray(new String[0]); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java index a25c1681a8c1a..a8dfac9e2d8b8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java @@ -25,7 +25,6 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.calcite.CalciteConfig; -import org.apache.flink.table.calcite.CalciteParser; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.calcite.FlinkRelBuilder; import org.apache.flink.table.calcite.FlinkRelBuilderFactory; @@ -39,6 +38,7 @@ import org.apache.flink.table.codegen.ExpressionReducer; import org.apache.flink.table.expressions.ExpressionBridge; import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.parse.CalciteParser; import org.apache.flink.table.plan.cost.DataSetCostFactory; import org.apache.flink.table.util.JavaScalaConversionUtil; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index eaae6cf2be54e..75ef8ee5a6b80 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.java.operators.DataSink import org.apache.flink.core.execution.JobClient import org.apache.flink.table.api._ import org.apache.flink.table.api.internal.TableResultImpl.PrintStyle -import org.apache.flink.table.calcite.{CalciteParser, FlinkPlannerImpl} +import org.apache.flink.table.calcite.FlinkPlannerImpl import org.apache.flink.table.catalog._ import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _} import org.apache.flink.table.delegation.Parser @@ -39,6 +39,7 @@ import org.apache.flink.table.operations.ddl._ import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, ShowFunctionsOperation, TableSourceQueryOperation, _} import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope +import org.apache.flink.table.parse.CalciteParser import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} import org.apache.flink.table.sinks.{BatchSelectTableSink, BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource @@ -53,8 +54,7 @@ import org.apache.calcite.tools.FrameworkConfig import _root_.java.lang.{Iterable => JIterable, Long => JLong} import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier} -import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap} -import java.util +import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap, ArrayList => JArrayList} import _root_.scala.collection.JavaConversions._ import _root_.scala.collection.JavaConverters._ @@ -543,8 +543,7 @@ abstract class TableEnvImpl( override def listFunctions(): Array[String] = functionCatalog.getFunctions override def getCompletionHints(statement: String, position: Int): Array[String] = { - val planner = getFlinkPlanner - planner.getCompletionHints(statement, position) + parser.getCompletionHints(statement, position) } override def sqlQuery(query: String): Table = { @@ -590,7 +589,7 @@ abstract class TableEnvImpl( val jobName = "insert-into_" + String.join(",", sinkIdentifierNames) try { val jobClient = execute(dataSinks, jobName) - val columns = new util.ArrayList[Column]() + val columns = new JArrayList[Column]() val affectedRowCounts = new Array[JLong](operations.size()) operations.indices.foreach { idx => // use sink identifier name as field name diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 60d5c93fc40a4..6b6b6dc5ab447 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ExtendedSqlNode import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.catalog.CatalogReader +import org.apache.flink.table.parse.CalciteParser import org.apache.calcite.plan.RelOptTable.ViewExpander import org.apache.calcite.plan._ @@ -61,18 +62,13 @@ class FlinkPlannerImpl( var validator: FlinkCalciteSqlValidator = _ - def getCompletionHints(sql: String, cursor: Int): Array[String] = { - val advisorValidator = new SqlAdvisorValidator( + def getSqlAdvisorValidator(): SqlAdvisorValidator = { + new SqlAdvisorValidator( operatorTable, catalogReaderSupplier.apply(true), // ignore cases for lenient completion typeFactory, SqlValidator.Config.DEFAULT - .withSqlConformance(config.getParserConfig.conformance())) - val advisor = new SqlAdvisor(advisorValidator, config.getParserConfig) - val replaced = Array[String](null) - val hints = advisor.getCompletionHints(sql, cursor, replaced) - .map(item => item.toIdentifier.toString) - hints.toArray + .withSqlConformance(config.getParserConfig.conformance())) } /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index d39fc30630ba0..6164e62b0f80f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -35,6 +35,7 @@ import org.apache.flink.table.expressions.{ExpressionBridge, PlannerExpression, import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ +import org.apache.flink.table.parse.CalciteParser import org.apache.flink.table.plan.StreamOptimizer import org.apache.flink.table.plan.nodes.LogicalSink import org.apache.flink.table.plan.nodes.datastream.DataStreamRel @@ -185,14 +186,6 @@ class StreamPlanner( } } - override def getCompletionHints( - statement: String, - position: Int) - : Array[String] = { - val planner = getFlinkPlanner - planner.getCompletionHints(statement, position) - } - private def translateToRel(modifyOperation: ModifyOperation): (RelNode, Boolean) = { modifyOperation match { case s: UnregisteredSinkModifyOperation[_] => diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 587bda3f0d70a..47a2a892bb44b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -22,12 +22,12 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.SqlParserException; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.calcite.CalciteParser; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -54,12 +54,18 @@ import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope; import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.command.ClearOperation; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.QuitOperation; +import org.apache.flink.table.operations.command.ResetOperation; +import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.DropDatabaseOperation; +import org.apache.flink.table.parse.CalciteParser; import org.apache.flink.table.planner.ParserImpl; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; @@ -69,6 +75,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; +import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -110,6 +117,11 @@ public class SqlToOperationConverterTest { asRootSchema( new CatalogManagerCalciteSchema(catalogManager, tableConfig, false)), new ExpressionBridge<>(PlannerExpressionConverter.INSTANCE())); + private final Parser parser = + new ParserImpl( + catalogManager, + () -> getPlannerBySqlDialect(SqlDialect.DEFAULT), + () -> getParserBySqlDialect(SqlDialect.DEFAULT)); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -671,6 +683,39 @@ public void testAlterTable() throws Exception { assertEquals(properties, alterTableOptionsOperation.getCatalogTable().getOptions()); } + @Test + public void testClearCommand() { + assertSimpleCommand("ClEaR", instanceOf(ClearOperation.class)); + } + + @Test + public void testHelpCommand() { + assertSimpleCommand("hELp", instanceOf(HelpOperation.class)); + } + + @Test + public void testQuitCommand() { + assertSimpleCommand("qUIt", instanceOf(QuitOperation.class)); + assertSimpleCommand("Exit", instanceOf(QuitOperation.class)); + } + + @Test + public void testResetCommand() { + assertSimpleCommand("REsEt", instanceOf(ResetOperation.class)); + } + + @Test + public void testSetOperation() { + assertSetCommand(" SEt "); + assertSetCommand("SET execution.runtime-type= batch", "execution.runtime-type", "batch"); + assertSetCommand( + "SET pipeline.jars = /path/to/test-_-jar.jar", + "pipeline.jars", + "/path/to/test-_-jar.jar"); + + assertFailedSetCommand("SET execution.runtime-type="); + } + // ~ Tool Methods ---------------------------------------------------------- private static TestItem createTestItem(Object... args) { @@ -695,6 +740,23 @@ private void assertShowFunctions( assertEquals(expectedSummary, showFunctionsOperation.asSummaryString()); } + private void assertSimpleCommand(String statement, Matcher matcher) { + Operation operation = parser.parse(statement).get(0); + assertThat(operation, matcher); + } + + private void assertSetCommand(String statement, String... operands) { + SetOperation operation = (SetOperation) parser.parse(statement).get(0); + + assertArrayEquals(operands, operation.getOperands()); + } + + private void assertFailedSetCommand(String statement) { + thrown.expect(SqlParserException.class); + + parser.parse(statement); + } + private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) { tableConfig.setSqlDialect(sqlDialect); return planningConfigurationBuilder.createCalciteParser(); @@ -707,18 +769,13 @@ private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) { } private ExpressionResolverBuilder getExpressionResolver() { - final Parser parser = - new ParserImpl( - catalogManager, - () -> getPlannerBySqlDialect(SqlDialect.DEFAULT), - () -> getParserBySqlDialect(SqlDialect.DEFAULT)); return ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser); } private Operation parse(String sql, SqlDialect sqlDialect) { FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect); - final CalciteParser parser = getParserBySqlDialect(sqlDialect); - SqlNode node = parser.parse(sql); + final CalciteParser calciteParser = getParserBySqlDialect(sqlDialect); + SqlNode node = calciteParser.parse(sql); return SqlToOperationConverter.convert(planner, catalogManager, node).get(); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index a6dd8caab9bd4..e2347c1cffb89 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -40,10 +40,11 @@ import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment import org.apache.flink.table.api.TableConfig import org.apache.flink.table.api.internal.TableEnvImpl import org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl -import org.apache.flink.table.calcite.{CalciteParser, FlinkRelBuilder} +import org.apache.flink.table.calcite.FlinkRelBuilder import org.apache.flink.table.codegen.{Compiler, FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.functions.ScalarFunction +import org.apache.flink.table.parse.CalciteParser import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetScan} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.utils.LegacyRowResource