Skip to content

Commit

Permalink
[FLINK-17717][sql-parser] Throws for DDL create temporary system func…
Browse files Browse the repository at this point in the history
… with composite identifier


This closes apache#12352
  • Loading branch information
danny0405 committed Jun 5, 2020
1 parent 564b652 commit f5ba41f
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public void testCommands() throws Exception {
TestItem.validSql("CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
SqlCommand.CREATE_FUNCTION,
"CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA",
SqlCommand.CREATE_FUNCTION,
"CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
"CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA"),
// drop function xx
TestItem.invalidSql("DROP FUNCTION "),
TestItem.invalidSql("DROP FUNCTIONS "),
Expand Down
100 changes: 55 additions & 45 deletions flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,36 @@
// limitations under the License.
-->

/**
* Parses a "IF EXISTS" option, default is false.
*/
boolean IfExistsOpt() :
{
}
{
(
LOOKAHEAD(2)
<IF> <EXISTS> { return true; }
|
{ return false; }
)
}

/**
* Parses a "IF NOT EXISTS" option, default is false.
*/
boolean IfNotExistsOpt() :
{
}
{
(
LOOKAHEAD(3)
<IF> <NOT> <EXISTS> { return true; }
|
{ return false; }
)
}

/**
* Parse a "Show Catalogs" metadata query command.
*/
Expand Down Expand Up @@ -87,11 +117,7 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
{
<CATALOG>

(
<IF> <EXISTS> { ifExists = true; }
|
{ ifExists = false; }
)
ifExists = IfExistsOpt()

catalogName = CompoundIdentifier()

Expand Down Expand Up @@ -142,10 +168,9 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
}
{
<DATABASE> { startPos = getPos(); }
[
LOOKAHEAD(3)
<IF> <NOT> <EXISTS> { ifNotExists = true; }
]

ifNotExists = IfNotExistsOpt()

databaseName = CompoundIdentifier()
[ <COMMENT> <QUOTED_STRING>
{
Expand Down Expand Up @@ -193,17 +218,13 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
{
<DATABASE>

(
<IF> <EXISTS> { ifExists = true; }
|
{ ifExists = false; }
)
ifExists = IfExistsOpt()

databaseName = CompoundIdentifier()
[
<RESTRICT> { cascade = false; }
|
<CASCADE> { cascade = true; }
<RESTRICT> { cascade = false; }
|
<CASCADE> { cascade = true; }
]

{
Expand Down Expand Up @@ -236,18 +257,16 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
boolean isSystemFunction = false;
}
{
[
<SYSTEM> {isSystemFunction = true;}
]

<FUNCTION>

[
LOOKAHEAD(3)
<IF> <NOT> <EXISTS> { ifNotExists = true; }
]

functionIdentifier = CompoundIdentifier()
(
<SYSTEM> <FUNCTION>
ifNotExists = IfNotExistsOpt()
functionIdentifier = SimpleIdentifier()
{ isSystemFunction = true; }
|
<FUNCTION>
ifNotExists = IfNotExistsOpt()
functionIdentifier = CompoundIdentifier()
)

<AS> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
Expand Down Expand Up @@ -281,7 +300,7 @@ SqlDrop SqlDropFunction(Span s, boolean replace, boolean isTemporary) :

<FUNCTION>

[ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
ifExists = IfExistsOpt()

functionIdentifier = CompoundIdentifier()

Expand Down Expand Up @@ -309,7 +328,7 @@ SqlAlterFunction SqlAlterFunction() :

<FUNCTION> { startPos = getPos(); }

[ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
ifExists = IfExistsOpt()

functionIdentifier = CompoundIdentifier()

Expand Down Expand Up @@ -813,11 +832,7 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
{
<TABLE>

(
<IF> <EXISTS> { ifExists = true; }
|
{ ifExists = false; }
)
ifExists = IfExistsOpt()

tableName = CompoundIdentifier()

Expand Down Expand Up @@ -936,10 +951,8 @@ SqlCreate SqlCreateView(Span s, boolean replace, boolean isTemporary) : {
{
<VIEW>

[
LOOKAHEAD(3)
<IF> <NOT> <EXISTS> { ifNotExists = true; }
]
ifNotExists = IfNotExistsOpt()

viewName = CompoundIdentifier()
[
fieldList = ParenthesizedSimpleIdentifierList()
Expand All @@ -964,11 +977,8 @@ SqlDrop SqlDropView(Span s, boolean replace, boolean isTemporary) :
{
<VIEW>

(
<IF> <EXISTS> { ifExists = true; }
|
{ ifExists = false; }
)
ifExists = IfExistsOpt()

viewName = CompoundIdentifier()
{
return new SqlDropView(s.pos(), viewName, ifExists, isTemporary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,6 @@ public void testCreateFunction() {
sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
.ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
.ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'")
.ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

Expand All @@ -964,6 +961,14 @@ public void testCreateFunction() {

sql("create temporary system function function1 as 'org.apache.fink.function.function1' language scala")
.ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");

// Temporary system function always belongs to the system and current session.
sql("create temporary system function catalog1^.^db1.function1 as 'org.apache.fink.function.function1'")
.fails("(?s).*Encountered \".\" at.*");

// TODO: FLINK-17957: Forbidden syntax "CREATE SYSTEM FUNCTION" for sql parser
sql("create system function function1 as 'org.apache.fink.function.function1'")
.ok("CREATE SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,18 +1302,10 @@ private TableResult createCatalogFunction(
String exMsg = getDDLOpExecuteErrorMsg(createCatalogFunctionOperation.asSummaryString());
try {
if (createCatalogFunctionOperation.isTemporary()) {
boolean exist = functionCatalog.hasTemporaryCatalogFunction(
createCatalogFunctionOperation.getFunctionIdentifier());
if (!exist) {
functionCatalog.registerTemporaryCatalogFunction(
functionCatalog.registerTemporaryCatalogFunction(
UnresolvedIdentifier.of(createCatalogFunctionOperation.getFunctionIdentifier().toList()),
createCatalogFunctionOperation.getCatalogFunction(),
false);
} else if (!createCatalogFunctionOperation.isIgnoreIfExists()) {
throw new ValidationException(
String.format("Temporary catalog function %s is already defined",
createCatalogFunctionOperation.getFunctionIdentifier().asSerializableString()));
}
createCatalogFunctionOperation.isIgnoreIfExists());
} else {
Catalog catalog = getCatalogOrThrowException(
createCatalogFunctionOperation.getFunctionIdentifier().getCatalogName());
Expand Down Expand Up @@ -1385,18 +1377,11 @@ private TableResult dropCatalogFunction(DropCatalogFunctionOperation dropCatalog
private TableResult createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
boolean exist = functionCatalog.hasTemporarySystemFunction(operation.getFunctionName());
if (!exist) {
functionCatalog.registerTemporarySystemFunction(
operation.getFunctionName(),
operation.getFunctionClass(),
operation.getFunctionLanguage(),
false);
} else if (!operation.isIgnoreIfExists()) {
throw new ValidationException(
String.format("Temporary system function %s is already defined",
operation.getFunctionName()));
}
functionCatalog.registerTemporarySystemFunction(
operation.getFunctionName(),
operation.getFunctionClass(),
operation.getFunctionLanguage(),
operation.isIgnoreIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ public void testCreateTemporaryCatalogFunction() {
tEnv().executeSql(ddl1);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
assertEquals(e.getMessage(),
"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" is already defined");
assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
e.getMessage());
}

tEnv().executeSql(ddl3);
Expand All @@ -164,21 +163,21 @@ public void testCreateTemporaryCatalogFunction() {
tEnv().executeSql(ddl3);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
assertEquals(e.getMessage(),
"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" doesn't exist");
assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" doesn't exist",
e.getMessage());
}
}

@Test
public void testCreateTemporarySystemFunction() {
String ddl1 = "create temporary system function default_catalog.default_database.f5" +
String ddl1 = "create temporary system function f5" +
" as '" + TEST_FUNCTION + "'";

String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
String ddl2 = "create temporary system function if not exists f5" +
" as '" + TEST_FUNCTION + "'";

String ddl3 = "drop temporary system function default_catalog.default_database.f5";
String ddl3 = "drop temporary system function f5";

tEnv().executeSql(ddl1);
tEnv().executeSql(ddl2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ class TableEnvironmentTest {
.functionExists(ObjectPath.fromString("default_database.f1")))

val tableResult4 = tableEnv.executeSql(
s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
assertTrue(tableEnv.listUserDefinedFunctions().contains("f2"))

val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION f2")
assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ public void testCreateTemporaryCatalogFunction() {
tableEnv.sqlUpdate(ddl1);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
assertEquals(e.getMessage(),
"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" is already defined");
assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
e.getMessage());
}

tableEnv.sqlUpdate(ddl3);
Expand All @@ -154,22 +153,22 @@ public void testCreateTemporaryCatalogFunction() {
tableEnv.sqlUpdate(ddl3);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
assertEquals(e.getMessage(),
"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" doesn't exist");
assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
" doesn't exist",
e.getMessage());
}
}

@Test
public void testCreateTemporarySystemFunction() {
TableEnvironment tableEnv = getTableEnvironment();
String ddl1 = "create temporary system function default_catalog.default_database.f5" +
String ddl1 = "create temporary system function f5" +
" as '" + TEST_FUNCTION + "'";

String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
" as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'";
String ddl2 = "create temporary system function if not exists f5" +
" as 'org.apache.flink.table.runtime.stream.sql.FunctionITCase$TestUDF'";

String ddl3 = "drop temporary system function default_catalog.default_database.f5";
String ddl3 = "drop temporary system function f5";

tableEnv.sqlUpdate(ddl1);
tableEnv.sqlUpdate(ddl2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ class BatchTableEnvironmentTest extends TableTestBase {
.functionExists(ObjectPath.fromString("default_database.f1")))

val tableResult4 = util.tableEnv.executeSql(
s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
assertTrue(util.tableEnv.listUserDefinedFunctions().contains("f2"))

val tableResult5 = util.tableEnv.executeSql(
"DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
"DROP TEMPORARY SYSTEM FUNCTION f2")
assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
assertFalse(util.tableEnv.listUserDefinedFunctions().contains("f2"))
}
Expand Down

0 comments on commit f5ba41f

Please sign in to comment.