Skip to content

Commit

Permalink
[FLINK-17715][sql-client] Align function DDL support with TableEnviro…
Browse files Browse the repository at this point in the history
…nment in SQL client

This closes apache#12171
  • Loading branch information
danny0405 authored May 18, 2020
1 parent 50232aa commit 20c28ac
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ private void callCommand(SqlCommandCall cmdCall) {
case DROP_VIEW:
callDropView(cmdCall);
break;
case CREATE_FUNCTION:
callCreateFunction(cmdCall);
break;
case DROP_FUNCTION:
callDropFunction(cmdCall);
break;
case ALTER_FUNCTION:
callAlterFunction(cmdCall);
break;
case SOURCE:
callSource(cmdCall);
break;
Expand Down Expand Up @@ -578,7 +587,6 @@ private void callCreateTable(SqlCommandCall cmdCall) {
printInfo(CliStrings.MESSAGE_TABLE_CREATED);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
}

Expand Down Expand Up @@ -631,6 +639,33 @@ private void callDropView(SqlCommandCall cmdCall) {
}
}

private void callCreateFunction(SqlCommandCall cmdCall) {
try {
executor.executeSql(sessionId, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_FUNCTION_CREATED);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}

private void callDropFunction(SqlCommandCall cmdCall) {
try {
executor.executeSql(sessionId, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_FUNCTION_REMOVED);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}

private void callAlterFunction(SqlCommandCall cmdCall) {
try {
executor.executeSql(sessionId, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED);
} catch (SqlExecutionException e) {
printExecutionException(CliStrings.MESSAGE_ALTER_FUNCTION_FAILED, e);
}
}

private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ private CliStrings() {

public static final String MESSAGE_VIEW_REMOVED = "View has been removed.";

public static final String MESSAGE_FUNCTION_CREATED = "Function has been created.";

public static final String MESSAGE_FUNCTION_REMOVED = "Function has been removed.";

public static final String MESSAGE_ALTER_FUNCTION_SUCCEEDED = "Alter function succeeded!";

public static final String MESSAGE_ALTER_FUNCTION_FAILED = "Alter function failed!";

public static final String MESSAGE_DATABASE_CREATED = "Database has been created.";

public static final String MESSAGE_DATABASE_REMOVED = "Database has been removed.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ enum SqlCommand {

DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),

ALTER_TABLE(
"(ALTER\\s+TABLE\\s+.*)",
SINGLE_OPERAND),

CREATE_VIEW(
"CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
(operands) -> {
Expand All @@ -152,6 +156,14 @@ enum SqlCommand {
return Optional.of(new String[]{operands[0], operands[1]});
}),

DROP_VIEW("DROP\\s+VIEW\\s+(.*)", SINGLE_OPERAND),

CREATE_FUNCTION("(CREATE.+FUNCTION\\s+(\\S+)\\s+AS.+)", SINGLE_OPERAND),

DROP_FUNCTION("(DROP.+FUNCTION\\s+.+)", SINGLE_OPERAND),

ALTER_FUNCTION("(ALTER.+FUNCTION\\s+.+)", SINGLE_OPERAND),

CREATE_DATABASE(
"(CREATE\\s+DATABASE\\s+.*)",
SINGLE_OPERAND),
Expand All @@ -160,18 +172,10 @@ enum SqlCommand {
"(DROP\\s+DATABASE\\s+.*)",
SINGLE_OPERAND),

DROP_VIEW(
"DROP\\s+VIEW\\s+(.*)",
SINGLE_OPERAND),

ALTER_DATABASE(
"(ALTER\\s+DATABASE\\s+.*)",
SINGLE_OPERAND),

ALTER_TABLE(
"(ALTER\\s+TABLE\\s+.*)",
SINGLE_OPERAND),

SET(
"SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
(operands) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.client.gateway;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -133,6 +134,11 @@ public interface Executor {
*/
List<String> listUserDefinedFunctions(String sessionId) throws SqlExecutionException;

/**
* Executes a SQL statement.
*/
TableResult executeSql(String sessionId, String statement) throws SqlExecutionException;

/**
* Lists all functions known to the executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
Expand Down Expand Up @@ -388,6 +389,17 @@ public List<String> listUserDefinedFunctions(String sessionId) throws SqlExecuti
return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listUserDefinedFunctions()));
}

@Override
public TableResult executeSql(String sessionId, String statement) throws SqlExecutionException {
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tEnv = context.getTableEnvironment();
try {
return context.wrapClassLoader(() -> tEnv.executeSql(statement));
} catch (Exception e) {
throw new SqlExecutionException("Could not execute statement: " + statement, e);
}
}

@Override
public List<String> listFunctions(String sessionId) throws SqlExecutionException {
final ExecutionContext<?> context = getExecutionContext(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.cli.utils.TerminalUtils;
import org.apache.flink.table.client.config.Environment;
Expand Down Expand Up @@ -338,6 +339,11 @@ public List<String> listUserDefinedFunctions(String sessionId) throws SqlExecuti
return null;
}

@Override
public TableResult executeSql(String sessionId, String statement) throws SqlExecutionException {
return null;
}

@Override
public List<String> listFunctions(String sessionId) throws SqlExecutionException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.client.cli;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.cli.utils.TerminalUtils;
Expand Down Expand Up @@ -199,6 +200,11 @@ public List<String> listUserDefinedFunctions(String sessionId) throws SqlExecuti
return null;
}

@Override
public TableResult executeSql(String sessionId, String statement) throws SqlExecutionException {
return null;
}

@Override
public List<String> listFunctions(String sessionId) throws SqlExecutionException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,36 @@ public void testCommands() {
testValidSqlCommand("DROP TABLE IF EXISTS t1", new SqlCommandCall(SqlCommand.DROP_TABLE, new String[]{"DROP TABLE IF EXISTS t1"}));
testValidSqlCommand("DROP TABLE IF EXISTS catalog1.db1.t1", new SqlCommandCall(SqlCommand.DROP_TABLE, new String[]{"DROP TABLE IF EXISTS catalog1.db1.t1"}));
testValidSqlCommand("DROP TABLE IF EXISTS db1.t1", new SqlCommandCall(SqlCommand.DROP_TABLE, new String[]{"DROP TABLE IF EXISTS db1.t1"}));
// Test create function.
testInvalidSqlCommand("CREATE FUNCTION");
testInvalidSqlCommand("CREATE FUNCTIONS");
testInvalidSqlCommand("CREATE FUNCTIONS");
testValidSqlCommand("CREATE FUNCTION catalog1.db1.func1 as 'class_name'",
new SqlCommandCall(SqlCommand.CREATE_FUNCTION, new String[] {"CREATE FUNCTION catalog1.db1.func1 as 'class_name'"}));
testValidSqlCommand("CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
new SqlCommandCall(SqlCommand.CREATE_FUNCTION, new String[] {"CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"}));
testValidSqlCommand("CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
new SqlCommandCall(SqlCommand.CREATE_FUNCTION, new String[] {"CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"}));
// Test drop function.
testInvalidSqlCommand("DROP FUNCTION");
testInvalidSqlCommand("DROP FUNCTIONS");
testInvalidSqlCommand("DROP FUNCTIONS");
testValidSqlCommand("DROP FUNCTION catalog1.db1.func1",
new SqlCommandCall(SqlCommand.DROP_FUNCTION, new String[] {"DROP FUNCTION catalog1.db1.func1"}));
testValidSqlCommand("DROP TEMPORARY FUNCTION catalog1.db1.func1",
new SqlCommandCall(SqlCommand.DROP_FUNCTION, new String[] {"DROP TEMPORARY FUNCTION catalog1.db1.func1"}));
testValidSqlCommand("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS catalog1.db1.func1",
new SqlCommandCall(SqlCommand.DROP_FUNCTION, new String[] {"DROP TEMPORARY SYSTEM FUNCTION IF EXISTS catalog1.db1.func1"}));
// Test alter function.
testInvalidSqlCommand("ALTER FUNCTION");
testInvalidSqlCommand("ALTER FUNCTIONS");
testInvalidSqlCommand("ALTER FUNCTIONS");
testValidSqlCommand("ALTER FUNCTION catalog1.db1.func1 as 'a.b.c.func2'",
new SqlCommandCall(SqlCommand.ALTER_FUNCTION, new String[] {"ALTER FUNCTION catalog1.db1.func1 as 'a.b.c.func2'"}));
testValidSqlCommand("ALTER TEMPORARY FUNCTION IF EXISTS catalog1.db1.func1 as 'a.b.c.func2'",
new SqlCommandCall(SqlCommand.ALTER_FUNCTION, new String[] {"ALTER TEMPORARY FUNCTION IF EXISTS catalog1.db1.func1 as 'a.b.c.func2'"}));
testValidSqlCommand("ALTER TEMPORARY SYSTEM FUNCTION IF EXISTS catalog1.db1.func1 as 'a.b.c.func2'",
new SqlCommandCall(SqlCommand.ALTER_FUNCTION, new String[] {"ALTER TEMPORARY SYSTEM FUNCTION IF EXISTS catalog1.db1.func1 as 'a.b.c.func2'"}));
}

private void testInvalidSqlCommand(String stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.client.cli;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
Expand Down Expand Up @@ -167,6 +168,11 @@ public List<String> listUserDefinedFunctions(String sessionId) throws SqlExecuti
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public TableResult executeSql(String sessionId, String statement) throws SqlExecutionException {
return null;
}

@Override
public List<String> listFunctions(String sessionId) throws SqlExecutionException {
throw new UnsupportedOperationException("Not implemented.");
Expand Down
Loading

0 comments on commit 20c28ac

Please sign in to comment.