Skip to content

Commit

Permalink
[FLINK-21531][table][hive] Introduce pluggable Parser
Browse files Browse the repository at this point in the history
This closes apache#15050
  • Loading branch information
lirui-apache committed Mar 7, 2021
1 parent 14267e6 commit e964e40
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.CalciteParser;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;

import org.apache.calcite.tools.FrameworkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

/** A Parser that uses Hive's planner to parse a statement. */
public class HiveParser extends ParserImpl {

private static final Logger LOG = LoggerFactory.getLogger(HiveParser.class);

private final PlannerContext plannerContext;
private final FlinkCalciteCatalogReader catalogReader;
private final FrameworkConfig frameworkConfig;

HiveParser(
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
Supplier<CalciteParser> calciteParserSupplier,
Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator,
PlannerContext plannerContext) {
super(
catalogManager,
validatorSupplier,
calciteParserSupplier,
sqlExprToRexConverterCreator);
this.plannerContext = plannerContext;
this.catalogReader =
plannerContext.createCatalogReader(
false,
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
this.frameworkConfig = plannerContext.createFrameworkConfig();
}

@Override
public List<Operation> parse(String statement) {
return super.parse(statement);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;

import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.delegation.ParserFactory;
import org.apache.flink.table.planner.delegation.PlannerContext;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/** A Parser factory that creates {@link HiveParser}. */
public class HiveParserFactory implements ParserFactory {

@Override
public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
plannerContext::createSqlExprToRexConverter;
return new HiveParser(
catalogManager,
() ->
plannerContext.createFlinkPlanner(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase()),
plannerContext::createCalciteParser,
tableSchema ->
sqlExprToRexConverterFactory.create(
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
plannerContext);
}

@Override
public Map<String, String> optionalContext() {
DescriptorProperties properties = new DescriptorProperties();
return properties.asMap();
}

@Override
public Map<String, String> requiredContext() {
DescriptorProperties properties = new DescriptorProperties();
properties.putString(
TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase());
return properties.asMap();
}

@Override
public List<String> supportedProperties() {
return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.module.hive.HiveModuleFactory
org.apache.flink.table.planner.delegation.hive.HiveParserFactory
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.delegation.hive.HiveParser;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -65,6 +68,8 @@
import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

/** Test Hive syntax when Hive dialect is used. */
Expand Down Expand Up @@ -99,6 +104,21 @@ public void tearDown() {
}
}

@Test
public void testPluggableParser() {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
Parser parser = tableEnvInternal.getParser();
// hive dialect should use HiveParser
assertTrue(parser instanceof HiveParser);
// execute some sql and verify the parser instance is reused
tableEnvInternal.executeSql("show databases");
assertSame(parser, tableEnvInternal.getParser());
// switching dialect will result in a new parser
tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT);
assertNotEquals(
parser.getClass().getName(), tableEnvInternal.getParser().getClass().getName());
}

@Test
public void testCreateDatabase() throws Exception {
tableEnv.executeSql("create database db1 comment 'db1 comment'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final Executor execEnv;
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
protected final Parser parser;
private final boolean isStreamingMode;
private final ClassLoader userClassLoader;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
Expand All @@ -195,7 +194,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {

@Override
public void createTemporaryTable(String path, CatalogBaseTable table) {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
catalogManager.createTemporaryTable(table, objectIdentifier, false);
Expand All @@ -221,18 +220,17 @@ protected TableEnvironmentImpl(

this.functionCatalog = functionCatalog;
this.planner = planner;
this.parser = planner.getParser();
this.isStreamingMode = isStreamingMode;
this.userClassLoader = userClassLoader;
this.operationTreeBuilder =
OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
functionCatalog.asLookup(getParser()::parseIdentifier),
catalogManager.getDataTypeFactory(),
path -> {
try {
UnresolvedIdentifier unresolvedIdentifier =
parser.parseIdentifier(path);
getParser().parseIdentifier(path);
Optional<CatalogQueryOperation> catalogQueryOperation =
scanInternal(unresolvedIdentifier);
return catalogQueryOperation.map(
Expand All @@ -250,7 +248,7 @@ protected TableEnvironmentImpl(
},
(sqlExpression, inputSchema) -> {
try {
return parser.parseSqlExpression(sqlExpression, inputSchema);
return getParser().parseSqlExpression(sqlExpression, inputSchema);
} catch (Throwable t) {
throw new ValidationException(
String.format("Invalid SQL expression: %s", sqlExpression),
Expand Down Expand Up @@ -420,14 +418,14 @@ public void createFunction(
String path,
Class<? extends UserDefinedFunction> functionClass,
boolean ignoreIfExists) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
functionCatalog.registerCatalogFunction(
unresolvedIdentifier, functionClass, ignoreIfExists);
}

@Override
public boolean dropFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
return functionCatalog.dropCatalogFunction(unresolvedIdentifier, true);
}

Expand All @@ -441,14 +439,14 @@ public void createTemporaryFunction(

@Override
public void createTemporaryFunction(String path, UserDefinedFunction functionInstance) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
functionCatalog.registerTemporaryCatalogFunction(
unresolvedIdentifier, functionInstance, false);
}

@Override
public boolean dropTemporaryFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
}

Expand All @@ -460,7 +458,7 @@ public void registerTable(String name, Table table) {

@Override
public void createTemporaryView(String path, Table view) {
UnresolvedIdentifier identifier = parser.parseIdentifier(path);
UnresolvedIdentifier identifier = getParser().parseIdentifier(path);
createTemporaryView(identifier, view);
}

Expand Down Expand Up @@ -492,7 +490,7 @@ public Table scan(String... tablePath) {

@Override
public Table from(String path) {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
return scanInternal(unresolvedIdentifier)
.map(this::createTable)
.orElseThrow(
Expand All @@ -504,7 +502,7 @@ public Table from(String path) {

@Override
public void insertInto(String targetPath, Table table) {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(targetPath);
UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(targetPath);
insertIntoInternal(unresolvedIdentifier, table);
}

Expand Down Expand Up @@ -587,7 +585,7 @@ public String[] listTemporaryViews() {

@Override
public boolean dropTemporaryTable(String path) {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
try {
catalogManager.dropTemporaryTable(identifier, false);
Expand All @@ -599,7 +597,7 @@ public boolean dropTemporaryTable(String path) {

@Override
public boolean dropTemporaryView(String path) {
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
try {
catalogManager.dropTemporaryView(identifier, false);
Expand Down Expand Up @@ -641,7 +639,7 @@ public String explain(boolean extended) {

@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
List<Operation> operations = parser.parse(statement);
List<Operation> operations = getParser().parse(statement);

if (operations.size() != 1) {
throw new TableException(
Expand All @@ -663,7 +661,7 @@ public String[] getCompletionHints(String statement, int position) {

@Override
public Table sqlQuery(String query) {
List<Operation> operations = parser.parse(query);
List<Operation> operations = getParser().parse(query);

if (operations.size() != 1) {
throw new ValidationException(
Expand All @@ -683,7 +681,7 @@ public Table sqlQuery(String query) {

@Override
public TableResult executeSql(String statement) {
List<Operation> operations = parser.parse(statement);
List<Operation> operations = getParser().parse(statement);

if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
Expand Down Expand Up @@ -761,7 +759,7 @@ public TableResult executeInternal(QueryOperation operation) {

@Override
public void sqlUpdate(String stmt) {
List<Operation> operations = parser.parse(stmt);
List<Operation> operations = getParser().parse(stmt);

if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
Expand Down Expand Up @@ -1362,7 +1360,7 @@ public JobExecutionResult execute(String jobName) throws Exception {

@Override
public Parser getParser() {
return parser;
return getPlanner().getParser();
}

@Override
Expand Down Expand Up @@ -1635,12 +1633,12 @@ protected TableImpl createTable(QueryOperation tableOperation) {
this,
tableOperation,
operationTreeBuilder,
functionCatalog.asLookup(parser::parseIdentifier));
functionCatalog.asLookup(getParser()::parseIdentifier));
}

@Override
public String getJsonPlan(String stmt) {
List<Operation> operations = parser.parse(stmt);
List<Operation> operations = getParser().parse(stmt);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! getJsonPlan() only accepts a single INSERT statement.");
Expand Down
Loading

0 comments on commit e964e40

Please sign in to comment.