Skip to content

Commit

Permalink
[FLINK-21836][table] Support parsing special commands by introducing …
Browse files Browse the repository at this point in the history
…ExtendedParser

This closes apache#15265
  • Loading branch information
fsk119 authored and wuchong committed Mar 25, 2021
1 parent 08ef809 commit c7c95e5
Show file tree
Hide file tree
Showing 46 changed files with 1,398 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.delegation.ParserImpl;
Expand All @@ -39,6 +38,7 @@
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParseUtils;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.util.FileUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.command.ClearOperation;
import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.delegation.hive.HiveParser;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
Expand Down Expand Up @@ -66,11 +71,13 @@

import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/** Test Hive syntax when Hive dialect is used. */
Expand Down Expand Up @@ -120,6 +127,20 @@ public void testPluggableParser() {
parser.getClass().getName(), tableEnvInternal.getParser().getClass().getName());
}

@Test
public void testParseCommand() {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
Parser parser = tableEnvInternal.getParser();

// hive dialect should use HiveParser
assertTrue(parser instanceof HiveParser);
assertThat(parser.parse("HELP").get(0), instanceOf(HelpOperation.class));
assertThat(parser.parse("clear").get(0), instanceOf(ClearOperation.class));
assertThat(parser.parse("SET").get(0), instanceOf(SetOperation.class));
assertThat(parser.parse("ResET").get(0), instanceOf(ResetOperation.class));
assertThat(parser.parse("Exit").get(0), instanceOf(QuitOperation.class));
}

@Test
public void testCreateDatabase() throws Exception {
tableEnv.executeSql("create database db1 comment 'db1 comment'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,26 @@ public ResolvedExpression parseSqlExpression(
return context.wrapClassLoader(
() -> parser.parseSqlExpression(sqlExpression, inputSchema));
}

@Override
public String[] getCompletionHints(String statement, int position) {
return context.wrapClassLoader(
() -> parser.getCompletionHints(statement, position));
}
};
}

@Override
public List<String> completeStatement(String sessionId, String statement, int position) {
final ExecutionContext context = getExecutionContext(sessionId);
final TableEnvironment tableEnv = context.getTableEnvironment();
final TableEnvironmentInternal tableEnv =
(TableEnvironmentInternal) context.getTableEnvironment();

try {
return context.wrapClassLoader(
() -> Arrays.asList(tableEnv.getCompletionHints(statement, position)));
() ->
Arrays.asList(
tableEnv.getParser().getCompletionHints(statement, position)));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public String explainInternal(List<Operation> operations, ExplainDetail... extra

@Override
public String[] getCompletionHints(String statement, int position) {
return planner.getCompletionHints(statement, position);
return planner.getParser().getCompletionHints(statement, position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,14 @@ public interface Parser {
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
*/
ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);

/**
* Returns completion hints for the given statement at the given cursor position. The completion
* happens case insensitively.
*
* @param statement Partial or slightly incorrect SQL statement
* @param position cursor position
* @return completion hints that fit at the current cursor position
*/
String[] getCompletionHints(String statement, int position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ public interface Planner {
*/
String explain(List<Operation> operations, ExplainDetail... extraDetails);

/**
* Returns completion hints for the given statement at the given cursor position. The completion
* happens case insensitively.
*
* @param statement Partial or slightly incorrect SQL statement
* @param position cursor position
* @return completion hints that fit at the current cursor position
*/
String[] getCompletionHints(String statement, int position);

/**
* Get the json plan of the given {@link ModifyOperation}s.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.Operation;

/** Operation to represent CLEAR command. */
public class ClearOperation implements Operation {
@Override
public String asSummaryString() {
return "CLEAR";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.Operation;

/** Operation that represents HELP command. */
public class HelpOperation implements Operation {

@Override
public String asSummaryString() {
return "HELP";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.Operation;

/** Operation that represent QUIT command. */
public class QuitOperation implements Operation {

@Override
public String asSummaryString() {
return "QUIT";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.Operation;

/** Operation that represent RESET command. */
public class ResetOperation implements Operation {
@Override
public String asSummaryString() {
return "RESET";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.Operation;

/** Operation to represent SET command. */
public class SetOperation implements Operation {

private final String[] operands;

public SetOperation(String[] operands) {
this.operands = operands;
}

public String[] getOperands() {
return operands;
}

@Override
public String asSummaryString() {
if (operands.length == 0) {
return "SET";
} else {
return String.format("SET %s=%s", operands[0], operands[1]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
return null;
}

@Override
public String[] getCompletionHints(String statement, int position) {
throw new UnsupportedOperationException("Unsupported operation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public String explain(List<Operation> operations, ExplainDetail... extraDetails)
return null;
}

@Override
public String[] getCompletionHints(String statement, int position) {
return new String[0];
}

@Override
public String getJsonPlan(List<ModifyOperation> modifyOperations) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.calcite;

import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;

import org.apache.calcite.config.CalciteConnectionConfigImpl;
Expand Down
Loading

0 comments on commit c7c95e5

Please sign in to comment.