Skip to content

Commit

Permalink
[FLINK-17106][table-planner] Support create and drop view in both pla…
Browse files Browse the repository at this point in the history
…nners

This closes apache#11727
  • Loading branch information
docete authored and KurtYoung committed Apr 22, 2020
1 parent 3ce6c09 commit 322d589
Show file tree
Hide file tree
Showing 17 changed files with 1,512 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ void PartitionSpecCommaList(SqlNodeList list) :
* Parses a create view or temporary view statement.
* CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] view_name [ (field1, field2 ...) ]
* AS select_statement
* We only support [IF NOT EXISTS] semantic in Flink although the parser supports [OR REPLACE] grammar.
* See: FLINK-17067
*/
SqlCreate SqlCreateView(Span s, boolean replace, boolean isTemporary) : {
SqlIdentifier viewName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
if (isTemporary()) {
writer.keyword("TEMPORARY");
}
if (getReplace()) {
writer.keyword("OR REPLACE");
}
writer.keyword("VIEW");
if (isIfNotExists()) {
writer.keyword("IF NOT EXISTS");
Expand Down Expand Up @@ -135,4 +132,8 @@ public boolean isTemporary() {
public boolean isIfNotExists() {
return ifNotExists;
}

public String[] fullViewName() {
return viewName.names.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public SqlIdentifier getViewName() {
return viewName;
}

public boolean getIfExists() {
return this.ifExists;
}

public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP");
if (isTemporary) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,19 @@ public void testCreateView() {
sql(sql).ok(expected);
}

@Test
public void testCreateViewWithInvalidFieldList() {
final String expected = "(?s).*Encountered \"\\)\" at line 1, column 15.\n" +
"Was expecting one of:\n" +
".*\n" +
".*\n" +
".*\n" +
".*\n" +
".*";
sql("CREATE VIEW V(^)^ AS SELECT * FROM TBL")
.fails(expected);
}

@Test
public void testCreateViewWithComment() {
final String sql = "create view v COMMENT 'this is a view' as select col1 from tbl";
Expand Down Expand Up @@ -748,6 +761,16 @@ public void testCreateTemporaryView() {
sql(sql).ok(expected);
}

@Test
public void testCreateTemporaryViewIfNotExists() {
final String sql = "create temporary view if not exists v as select col1 from tbl";
final String expected = "CREATE TEMPORARY VIEW IF NOT EXISTS `V`\n" +
"AS\n" +
"SELECT `COL1`\n" +
"FROM `TBL`";
sql(sql).ok(expected);
}

@Test
public void testCreateViewIfNotExists() {
final String sql = "create view if not exists v as select col1 from tbl";
Expand All @@ -760,6 +783,13 @@ public void testCreateViewIfNotExists() {

@Test
public void testDropView() {
final String sql = "DROP VIEW IF EXISTS view_name";
final String expected = "DROP VIEW IF EXISTS `VIEW_NAME`";
sql(sql).ok(expected);
}

@Test
public void testDropTemporaryView() {
final String sql = "DROP TEMPORARY VIEW IF EXISTS view_name";
final String expected = "DROP TEMPORARY VIEW IF EXISTS `VIEW_NAME`";
sql(sql).ok(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
Expand Down Expand Up @@ -136,12 +138,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
"CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, " +
"DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG.";
"DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, CREATE VIEW, DROP VIEW.";
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type " +
"CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
"CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " +
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS.";
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW.";

/**
* Provides necessary methods for {@link ConnectTableDescriptor}.
Expand Down Expand Up @@ -626,6 +628,8 @@ public void sqlUpdate(String stmt) {
} else if (operation instanceof CreateTableOperation ||
operation instanceof DropTableOperation ||
operation instanceof AlterTableOperation ||
operation instanceof CreateViewOperation ||
operation instanceof DropViewOperation ||
operation instanceof CreateDatabaseOperation ||
operation instanceof DropDatabaseOperation ||
operation instanceof AlterDatabaseOperation ||
Expand Down Expand Up @@ -681,6 +685,35 @@ private TableResult executeOperation(Operation operation) {
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
if (createViewOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createViewOperation.getCatalogView(),
createViewOperation.getViewIdentifier(),
createViewOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createViewOperation.getCatalogView(),
createViewOperation.getViewIdentifier(),
createViewOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropViewOperation) {
DropViewOperation dropViewOperation = (DropViewOperation) operation;
if (dropViewOperation.isTemporary()) {
boolean dropped = catalogManager.dropTemporaryView(dropViewOperation.getViewIdentifier());
if (!dropped && !dropViewOperation.isIfExists()) {
throw new ValidationException(String.format(
"Temporary views with identifier '%s' doesn't exist",
dropViewOperation.getViewIdentifier().asSummaryString()));
}
} else {
catalogManager.dropTable(
dropViewOperation.getViewIdentifier(),
dropViewOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof CreateDatabaseOperation) {
CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ public void createTemporaryTable(
temporaryTables.compute(objectIdentifier, (k, v) -> {
if (v != null) {
if (!ignoreIfExists) {
throw new ValidationException(String.format("Temporary table %s already exists", objectIdentifier));
throw new ValidationException(String.format("Temporary table '%s' already exists", objectIdentifier));
}
return v;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Operation to describe a CREATE VIEW statement.
*/
public class CreateViewOperation implements CreateOperation {
private final ObjectIdentifier viewIdentifier;
private CatalogView catalogView;
private boolean ignoreIfExists;
private boolean isTemporary;

public CreateViewOperation(
ObjectIdentifier viewIdentifier,
CatalogView catalogView,
boolean ignoreIfExists,
boolean isTemporary) {
this.viewIdentifier = viewIdentifier;
this.catalogView = catalogView;
this.ignoreIfExists = ignoreIfExists;
this.isTemporary = isTemporary;
}

public CatalogView getCatalogView() {
return catalogView;
}

public ObjectIdentifier getViewIdentifier() {
return viewIdentifier;
}

public boolean isIgnoreIfExists() {
return ignoreIfExists;
}

public boolean isTemporary() {
return isTemporary;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("originalQuery", catalogView.getOriginalQuery());
params.put("expandedQuery", catalogView.getExpandedQuery());
params.put("identifier", viewIdentifier);
params.put("ignoreIfExists", ignoreIfExists);
params.put("isTemporary", isTemporary);
return OperationUtils.formatWithChildren(
"CREATE VIEW",
params,
Collections.emptyList(),
Operation::asSummaryString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Operation to describe a DROP VIEW statement.
*/
public class DropViewOperation implements DropOperation {

private final ObjectIdentifier viewIdentifier;
private final boolean ifExists;
private final boolean isTemporary;

public DropViewOperation(ObjectIdentifier viewIdentifier, boolean ifExists, boolean isTemporary) {
this.viewIdentifier = viewIdentifier;
this.ifExists = ifExists;
this.isTemporary = isTemporary;
}

public ObjectIdentifier getViewIdentifier() {
return this.viewIdentifier;
}

public boolean isIfExists() {
return this.ifExists;
}

public boolean isTemporary() {
return this.isTemporary;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("identifier", viewIdentifier);
params.put("ifExists", ifExists);
params.put("isTemporary", isTemporary);

return OperationUtils.formatWithChildren(
"DROP VIEW",
params,
Collections.emptyList(),
Operation::asSummaryString);
}
}
Loading

0 comments on commit 322d589

Please sign in to comment.