Skip to content

Commit

Permalink
[FLINK-29732][sql-gateway] Support to configureSession in the SqlGate…
Browse files Browse the repository at this point in the history
…wayService

This closes apache#21133
  • Loading branch information
yuzelin authored and fsk119 committed Dec 8, 2022
1 parent a0a5c71 commit e6215d4
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public interface SqlGatewayService {
*/
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;

/**
* Using the statement to initialize the Session. It's only allowed to execute
* SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR.
*
* <p>It returns until the execution finishes.
*
* @param sessionHandle handle to identify the session.
* @param statement the statement used to configure the session.
* @param executionTimeoutMs the execution timeout. Please use non-positive value to forbid the
* timeout mechanism.
*/
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs)
throws SqlGatewayException;

/**
* Get the current configuration of the {@code Session}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException
throw new UnsupportedOperationException();
}

public void configureSession(
SessionHandle sessionHandle, String statement, long executionTimeoutMs)
throws SqlGatewayException {
throw new UnsupportedOperationException();
}

@Override
public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
throws SqlGatewayException {
Expand Down
7 changes: 7 additions & 0 deletions flink-table/flink-sql-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@
<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.table.gateway.service.session.SessionManager;

Expand Down Expand Up @@ -79,6 +80,31 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException
}
}

@Override
public void configureSession(
SessionHandle sessionHandle, String statement, long executionTimeoutMs)
throws SqlGatewayException {
try {
if (executionTimeoutMs > 0) {
// TODO: support the feature in FLINK-27838
throw new UnsupportedOperationException(
"SqlGatewayService doesn't support timeout mechanism now.");
}

OperationManager operationManager = getSession(sessionHandle).getOperationManager();
OperationHandle operationHandle =
operationManager.submitOperation(
handle ->
getSession(sessionHandle)
.createExecutor()
.configureSession(handle, statement));
operationManager.awaitOperationTermination(operationHandle);
} catch (Throwable t) {
LOG.error("Failed to configure session.", t);
throw new SqlGatewayException("Failed to configure session.", t);
}
}

@Override
public Map<String, String> getSessionConfig(SessionHandle sessionHandle)
throws SqlGatewayException {
Expand Down Expand Up @@ -311,6 +337,8 @@ public GatewayInfo getGatewayInfo() {
return GatewayInfo.INSTANCE;
}

// --------------------------------------------------------------------------------------------

@VisibleForTesting
public Session getSession(SessionHandle sessionHandle) {
return sessionManager.getSession(sessionHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,28 @@
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
import org.apache.flink.util.CollectionUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -84,6 +90,48 @@ public OperationExecutor(SessionContext context, Configuration executionConfig)
this.executionConfig = executionConfig;
}

public ResultFetcher configureSession(OperationHandle handle, String statement) {
TableEnvironmentInternal tableEnv = getTableEnvironment();
List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
if (parsedOperations.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported SQL statement! Configure session only accepts a single SQL statement.");
}
Operation op = parsedOperations.get(0);

if (!(op instanceof SetOperation)
&& !(op instanceof ResetOperation)
&& !(op instanceof CreateOperation)
&& !(op instanceof DropOperation)
&& !(op instanceof UseOperation)
&& !(op instanceof AlterOperation)
&& !(op instanceof LoadModuleOperation)
&& !(op instanceof UnloadModuleOperation)
&& !(op instanceof AddJarOperation)) {
throw new UnsupportedOperationException(
String.format(
"Unsupported statement for configuring session:%s\n"
+ "The configureSession API only supports to execute statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ "CREATE CATALOG, DROP CATALOG, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, "
+ "CREATE VIEW, DROP VIEW, "
+ "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+ "ADD JAR.",
statement));
}

if (op instanceof SetOperation) {
return callSetOperation(tableEnv, handle, (SetOperation) op);
} else if (op instanceof ResetOperation) {
return callResetOperation(handle, (ResetOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
}

public ResultFetcher executeStatement(OperationHandle handle, String statement) {
// Instantiate the TableEnvironment lazily
TableEnvironmentInternal tableEnv = getTableEnvironment();
Expand Down Expand Up @@ -114,9 +162,7 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(
handle, result.getResolvedSchema(), collect(result.collectInternal()));
return callOperation(tableEnv, handle, op);
}
}

Expand Down Expand Up @@ -228,7 +274,8 @@ private ResultFetcher callSetOperation(
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
} else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
// show all properties
Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
Expand All @@ -237,7 +284,7 @@ private ResultFetcher callSetOperation(
ResolvedSchema.of(
Column.physical(SET_KEY, DataTypes.STRING()),
Column.physical(SET_VALUE, DataTypes.STRING())),
collect(
CollectionUtil.iteratorToList(
configMap.entrySet().stream()
.map(
entry ->
Expand All @@ -264,7 +311,8 @@ private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
collect(TableResultInternal.TABLE_RESULT_OK.collectInternal()));
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}

private ResultFetcher callModifyOperations(
Expand All @@ -289,6 +337,15 @@ private ResultFetcher callModifyOperations(
.toString()))));
}

private ResultFetcher callOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(
handle,
result.getResolvedSchema(),
CollectionUtil.iteratorToList(result.collectInternal()));
}

private Set<TableInfo> listTables(
String catalogName, String databaseName, boolean includeViews) {
CatalogManager catalogManager = sessionContext.getSessionState().catalogManager;
Expand Down Expand Up @@ -333,10 +390,4 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
TableKind.VIEW))
.collect(Collectors.toSet()));
}

private List<RowData> collect(Iterator<RowData> tableResult) {
List<RowData> rows = new ArrayList<>();
tableResult.forEachRemaining(rows::add);
return rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public void closeOperation(OperationHandle operationHandle) {
});
}

public void awaitOperationTermination(OperationHandle operationHandle) throws Exception {
getOperation(operationHandle).awaitTermination();
}

/**
* Get the {@link OperationInfo} of the operation.
*
Expand Down Expand Up @@ -307,15 +311,9 @@ public ResultSet fetchResults(FetchOrientation orientation, int maxRows) {
}

public ResolvedSchema getResultSchema() throws Exception {
synchronized (status) {
while (!status.get().isTerminalStatus()) {
status.wait();
}
}
awaitTermination();
OperationStatus current = status.get();
if (current == OperationStatus.ERROR) {
throw operationError;
} else if (current != OperationStatus.FINISHED) {
if (current != OperationStatus.FINISHED) {
throw new IllegalStateException(
String.format(
"The result schema is available when the Operation is in FINISHED state but the current status is %s.",
Expand All @@ -328,6 +326,18 @@ public OperationInfo getOperationInfo() {
return new OperationInfo(status.get(), operationError);
}

public void awaitTermination() throws Exception {
synchronized (status) {
while (!status.get().isTerminalStatus()) {
status.wait();
}
}
OperationStatus current = status.get();
if (current == OperationStatus.ERROR) {
throw operationError;
}
}

private ResultSet fetchResultsInternal(Supplier<ResultSet> results) {
OperationStatus currentStatus = status.get();

Expand Down
Loading

0 comments on commit e6215d4

Please sign in to comment.