Skip to content

Commit

Permalink
[FLINK-31494][table-planner] Introduce SqlNodeConverter for SqlToOper…
Browse files Browse the repository at this point in the history
…ationConverter
  • Loading branch information
wuchong committed Mar 17, 2023
1 parent fa0dd35 commit ba03d4d
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations;

import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
import org.apache.flink.table.planner.utils.Expander;

import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;

/** An implementation of {@link SqlNodeConverter.ConvertContext}. */
public class SqlNodeConvertContext implements SqlNodeConverter.ConvertContext {

private final FlinkPlannerImpl flinkPlanner;
private final CatalogManager catalogManager;

public SqlNodeConvertContext(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) {
this.flinkPlanner = flinkPlanner;
this.catalogManager = catalogManager;
}

@Override
public SqlValidator getSqlValidator() {
return flinkPlanner.getOrCreateSqlValidator();
}

@Override
public CatalogManager getCatalogManager() {
return catalogManager;
}

@Override
public RelRoot toRelRoot(SqlNode sqlNode) {
return flinkPlanner.rel(sqlNode);
}

@Override
public String toQuotedSqlString(SqlNode sqlNode) {
SqlParser.Config parserConfig = flinkPlanner.config().getParserConfig();
SqlDialect dialect =
new CalciteSqlDialect(
SqlDialect.EMPTY_CONTEXT
.withQuotedCasing(parserConfig.unquotedCasing())
.withConformance(parserConfig.conformance())
.withUnquotedCasing(parserConfig.unquotedCasing())
.withIdentifierQuoteString(parserConfig.quoting().string));
return sqlNode.toSqlString(dialect).getSql();
}

@Override
public String expandSqlIdentifiers(String originalSql) {
return Expander.create(flinkPlanner)
.expanded(originalSql)
.substitute(this::toQuotedSqlString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverters;
import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
Expand Down Expand Up @@ -286,11 +287,18 @@ public static Optional<Operation> convert(
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) {
beforeConversion();

// delegate conversion to the registered converters first
SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, catalogManager);
Optional<Operation> operation = SqlNodeConverters.convertSqlNode(validated, context);
if (operation.isPresent()) {
return operation;
}

// TODO: all the below conversion logic should be migrated to SqlNodeConverters
SqlNodeToOperationConversion converter =
new SqlNodeToOperationConversion(flinkPlanner, catalogManager);
if (validated instanceof SqlCreateCatalog) {
return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
} else if (validated instanceof SqlDropCatalog) {
if (validated instanceof SqlDropCatalog) {
return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated));
} else if (validated instanceof SqlLoadModule) {
return Optional.of(converter.convertLoadModule((SqlLoadModule) validated));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;

import java.util.HashMap;
import java.util.Map;

/** A converter for {@link SqlCreateCatalog}. */
public class SqlCreateCatalogConverter implements SqlNodeConverter<SqlCreateCatalog> {

@Override
public Operation convertSqlNode(SqlCreateCatalog node, ConvertContext context) {
// set with properties
Map<String, String> properties = new HashMap<>();
node.getPropertyList()
.getList()
.forEach(
p ->
properties.put(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));

return new CreateCatalogOperation(node.catalogName(), properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.utils.Expander;

import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidator;

/** A converter to convert {@link SqlNode} instance into {@link Operation}. */
public interface SqlNodeConverter<S extends SqlNode> {

/**
* Convert the given validated {@link SqlNode} into an {@link Operation}.
*
* @param node a validated {@link SqlNode}.
* @param context the utilities and context information to convert
*/
Operation convertSqlNode(S node, ConvertContext context);

/** Context of {@link SqlNodeConverter}. */
interface ConvertContext {

/** Returns the {@link SqlValidator} in the convert context. */
SqlValidator getSqlValidator();

/** Returns the {@link CatalogManager} in the convert context. */
CatalogManager getCatalogManager();

/** Converts the given validated {@link SqlNode} into a {@link RelRoot}. */
RelRoot toRelRoot(SqlNode sqlNode);

/** Convert the given {@param sqlNode} into a quoted SQL string. */
String toQuotedSqlString(SqlNode sqlNode);

/**
* Expands identifiers in a given SQL string.
*
* @see Expander
*/
String expandSqlIdentifiers(String sql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;

import org.apache.calcite.sql.SqlNode;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/** Registry of SqlNode converters. */
public class SqlNodeConverters {

private static final Map<Class<?>, SqlNodeConverter<?>> CONVERTERS = new HashMap<>();

static {
// register all the converters here
register(new SqlCreateCatalogConverter());
}

/**
* Convert the given validated SqlNode into Operation if there is a registered converter for the
* node.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static Optional<Operation> convertSqlNode(
SqlNode validatedSqlNode, ConvertContext context) {
SqlNodeConverter converter = CONVERTERS.get(validatedSqlNode.getClass());
if (converter != null) {
return Optional.of(converter.convertSqlNode(validatedSqlNode, context));
} else {
return Optional.empty();
}
}

private static void register(SqlNodeConverter<?> converter) {
// extract the parameter type of the converter class
TypeInformation<?> typeInfo =
TypeExtractor.createTypeInfo(
converter, SqlNodeConverter.class, converter.getClass(), 0);
CONVERTERS.put(typeInfo.getTypeClass(), converter);
}
}

0 comments on commit ba03d4d

Please sign in to comment.