Skip to content

Commit

Permalink
[FLINK-21297][table] Support 'LOAD/UNLOAD MODULE' syntax
Browse files Browse the repository at this point in the history
Support 'LOAD/UNLOAD MODULE' syntax both in SQL parser, TableEnvironment and SQL CLI.

This closes apache#14944
  • Loading branch information
LadyForest committed Feb 22, 2021
1 parent 7f4e707 commit e2e1121
Show file tree
Hide file tree
Showing 23 changed files with 812 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,18 @@ private void callCommand(SqlCommandCall cmdCall) {
case DROP_CATALOG:
callDdl(cmdCall.operands[0], CliStrings.MESSAGE_CATALOG_REMOVED);
break;
case LOAD_MODULE:
callDdl(
cmdCall.operands[0],
CliStrings.MESSAGE_LOAD_MODULE_SUCCEEDED,
CliStrings.MESSAGE_LOAD_MODULE_FAILED);
break;
case UNLOAD_MODULE:
callDdl(
cmdCall.operands[0],
CliStrings.MESSAGE_UNLOAD_MODULE_SUCCEEDED,
CliStrings.MESSAGE_UNLOAD_MODULE_FAILED);
break;
default:
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ private CliStrings() {
formatCommand(
SqlCommand.USE,
"Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
.append(
formatCommand(
SqlCommand.LOAD_MODULE,
"Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = "
+ "'<value1>' [, '<key2>' = '<value2>', ...])];'"))
.append(
formatCommand(
SqlCommand.UNLOAD_MODULE,
"Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
.style(AttributedStyle.DEFAULT.underline())
.append("\nHint")
.style(AttributedStyle.DEFAULT)
Expand Down Expand Up @@ -220,6 +229,14 @@ private CliStrings() {

public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been removed.";

public static final String MESSAGE_LOAD_MODULE_SUCCEEDED = "Load module succeeded!";

public static final String MESSAGE_UNLOAD_MODULE_SUCCEEDED = "Unload module succeeded!";

public static final String MESSAGE_LOAD_MODULE_FAILED = "Load module failed!";

public static final String MESSAGE_UNLOAD_MODULE_FAILED = "Unload module failed!";

// --------------------------------------------------------------------------------------------

public static final String RESULT_TITLE = "SQL Query Result";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
Expand All @@ -32,6 +33,7 @@
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowPartitionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
Expand Down Expand Up @@ -168,6 +170,10 @@ private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) {
cmd = SqlCommand.ALTER_FUNCTION;
} else if (operation instanceof ExplainOperation) {
cmd = SqlCommand.EXPLAIN;
} else if (operation instanceof LoadModuleOperation) {
cmd = SqlCommand.LOAD_MODULE;
} else if (operation instanceof UnloadModuleOperation) {
cmd = SqlCommand.UNLOAD_MODULE;
} else if (operation instanceof DescribeTableOperation) {
cmd = SqlCommand.DESCRIBE;
operands =
Expand Down Expand Up @@ -252,6 +258,10 @@ enum SqlCommand {
// FLINK-17396
SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),

LOAD_MODULE,

UNLOAD_MODULE,

SHOW_PARTITIONS,

USE_CATALOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,28 @@ public void testCommands() throws Exception {
// show current database
TestItem.validSql(
"show current database", SqlCommand.SHOW_CURRENT_DATABASE),
// load module with module name as identifier
TestItem.validSql(
"show current database", SqlCommand.SHOW_CURRENT_DATABASE),
// show tables
TestItem.validSql("SHOW TABLES;", SqlCommand.SHOW_TABLES),
TestItem.validSql(" SHOW TABLES ;", SqlCommand.SHOW_TABLES),
// show functions
TestItem.validSql("SHOW FUNCTIONS;", SqlCommand.SHOW_FUNCTIONS),
TestItem.validSql(" SHOW FUNCTIONS ", SqlCommand.SHOW_FUNCTIONS),
// show modules
TestItem.validSql("SHOW MODULES", SqlCommand.SHOW_MODULES)
.cannotParseComment(),
TestItem.validSql(" SHOW MODULES ", SqlCommand.SHOW_MODULES)
.cannotParseComment(),
"LOAD MODULE dummy", SqlCommand.LOAD_MODULE, "LOAD MODULE dummy"),
// load module with module name as reversed keyword
TestItem.validSql(
"LOAD MODULE `MODULE`",
SqlCommand.LOAD_MODULE,
"LOAD MODULE `MODULE`"),
// load module with module name as literal
TestItem.invalidSql(
"LOAD MODULE 'dummy'",
SqlExecutionException.class,
"Encountered \"\\'dummy\\'\""),
TestItem.validSql(
"LOAD MODULE dummy WITH ('dummy-version' = '1')",
SqlCommand.LOAD_MODULE,
"LOAD MODULE dummy WITH ('dummy-version' = '1')"),
// unload module
TestItem.validSql(
"UNLOAD MODULE dummy",
SqlCommand.UNLOAD_MODULE,
"UNLOAD MODULE dummy"),
// Test create function.
TestItem.invalidSql(
"CREATE FUNCTION ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
public class ExecutionContextTest {

private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
public static final String MODULES_ENVIRONMENT_FILE = "test-sql-client-modules.yaml";
public static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
private static final String CONFIGURATION_ENVIRONMENT_FILE =
Expand Down Expand Up @@ -413,19 +413,23 @@ private Map<String, String> createDefaultReplaceVars() {
return replaceVars;
}

static Map<String, String> createModuleReplaceVars() {
Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", "blink");
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
return replaceVars;
}

private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception {
final Map<String, String> replaceVars = createDefaultReplaceVars();
return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
}

private <T> ExecutionContext<T> createModuleExecutionContext() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", "old");
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
return createExecutionContext(MODULES_ENVIRONMENT_FILE, createModuleReplaceVars());
}

private <T> ExecutionContext<T> createCatalogExecutionContext() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.CATALOGS_ENVIRONMENT_FILE;
import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.MODULES_ENVIRONMENT_FILE;
import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.createModuleReplaceVars;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -1618,6 +1621,86 @@ public void testCreateFunctionWithHiveCatalog() throws Exception {
executor.closeSession(sessionId);
}

@Test
public void testLoadModuleWithModuleConfEnabled() throws Exception {
// only blink planner supports LOAD MODULE syntax
Assume.assumeTrue(planner.equals("blink"));
final Executor executor =
createModifiedExecutor(
MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

assertThrows(
"Could not execute statement: load module core",
SqlExecutionException.class,
() -> executor.executeSql(sessionId, "load module core"));

executor.executeSql(sessionId, "load module hive");
assertEquals(
executor.listModules(sessionId),
Arrays.asList("core", "mymodule", "myhive", "myhive2", "hive"));
}

@Test
public void testUnloadModuleWithModuleConfEnabled() throws Exception {
// only blink planner supports UNLOAD MODULE syntax
Assume.assumeTrue(planner.equals("blink"));
final Executor executor =
createModifiedExecutor(
MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

executor.executeSql(sessionId, "unload module mymodule");
assertEquals(executor.listModules(sessionId), Arrays.asList("core", "myhive", "myhive2"));

exception.expect(SqlExecutionException.class);
exception.expectMessage("Could not execute statement: unload module mymodule");
executor.executeSql(sessionId, "unload module mymodule");
}

@Test
public void testHiveBuiltInFunctionWithHiveModuleEnabled() throws Exception {
// only blink planner supports LOAD MODULE syntax
Assume.assumeTrue(planner.equals("blink"));

final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", planner);
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
replaceVars.put("$VAR_RESULT_MODE", "table");

final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

// cannot use hive built-in function without loading hive module
assertThrows(
"Could not execute statement: select substring_index('www.apache.org', '.', 2) from TableNumber1",
SqlExecutionException.class,
() ->
executor.executeSql(
sessionId,
"select substring_index('www.apache.org', '.', 2) from TableNumber1"));

executor.executeSql(sessionId, "load module hive");
assertEquals(executor.listModules(sessionId), Arrays.asList("core", "hive"));

assertShowResult(
executor.executeSql(
sessionId,
"select substring_index('www.apache.org', '.', 2) from TableNumber1"),
hasItems("www.apache"));
}

private void executeStreamQueryTable(
Map<String, String> replaceVars, String query, List<String> expectedResults)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
"org.apache.flink.sql.parser.dql.SqlShowDatabases"
Expand All @@ -86,6 +87,7 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowPartitions"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
Expand Down Expand Up @@ -125,6 +127,7 @@
"ITEMS"
"KEYS"
"LINES"
"LOAD"
"LOCATION"
"NORELY"
"NOVALIDATE"
Expand All @@ -145,6 +148,7 @@
"TABLES"
"TBLPROPERTIES"
"TERMINATED"
"UNLOAD"
"USE"
"VALIDATE"
]
Expand Down Expand Up @@ -272,6 +276,7 @@
"LENGTH"
"LEVEL"
"LIBRARY"
"LOAD"
"LOCATOR"
"M"
"MAP"
Expand Down Expand Up @@ -443,6 +448,7 @@
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
"UNLOAD"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
Expand Down Expand Up @@ -510,6 +516,7 @@
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"RichSqlInsert()"
"SqlLoadModule()"
"SqlShowCatalogs()"
"SqlShowCurrentCatalogOrDatabase()"
"SqlDescribeCatalog()"
Expand All @@ -524,6 +531,7 @@
"SqlAlterTable()"
"SqlAlterView()"
"SqlShowPartitions()"
"SqlUnloadModule()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1510,3 +1510,44 @@ SqlShowPartitions SqlShowPartitions() :
[ <PARTITION> { partitionSpec = new SqlNodeList(getPos()); PartitionSpecCommaList(new SqlNodeList(getPos()), partitionSpec); } ]
{ return new SqlShowPartitions(pos, tableIdentifier, partitionSpec); }
}

/**
* Parses a load module statement.
* LOAD MODULE module_name [WITH (property_name=property_value, ...)];
*/
SqlLoadModule SqlLoadModule() :
{
SqlParserPos startPos;
SqlIdentifier moduleName;
SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
<LOAD> <MODULE> { startPos = getPos(); }
moduleName = SimpleIdentifier()
[
<WITH>
propertyList = TableProperties()
]
{
return new SqlLoadModule(startPos.plus(getPos()),
moduleName,
propertyList);
}
}

/**
* Parses an unload module statement.
* UNLOAD MODULE module_name;
*/
SqlUnloadModule SqlUnloadModule() :
{
SqlParserPos startPos;
SqlIdentifier moduleName;
}
{
<UNLOAD> <MODULE> { startPos = getPos(); }
moduleName = SimpleIdentifier()
{
return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,17 @@ public void testShowPartitions() {
sql("show partitions tbl").ok("SHOW PARTITIONS `TBL`");
sql("show partitions tbl partition (p=1)").ok("SHOW PARTITIONS `TBL` PARTITION (`P` = 1)");
}

@Test
public void testLoadModule() {
sql("load module hive").ok("LOAD MODULE `HIVE`");

sql("load module hive with ('hive-version' = '3.1.2')")
.ok("LOAD MODULE `HIVE` WITH (\n 'hive-version' = '3.1.2'\n)");
}

@Test
public void testUnloadModule() {
sql("unload module hive").ok("UNLOAD MODULE `HIVE`");
}
}
Loading

0 comments on commit e2e1121

Please sign in to comment.