Skip to content

Commit

Permalink
[FLINK-27837][sql-gateway] Support 'BEGIN STATEMENT SET' in SQL Gateway
Browse files Browse the repository at this point in the history
This closes apache#21187
  • Loading branch information
yuzelin authored and fsk119 committed Dec 21, 2022
1 parent eb44ac0 commit c5d2534
Show file tree
Hide file tree
Showing 4 changed files with 457 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
Expand All @@ -62,8 +63,10 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

Expand All @@ -87,6 +90,9 @@ public class SessionContext {

private final OperationManager operationManager;

private boolean isStatementSetState;
private final List<ModifyOperation> statementSetOperations;

private SessionContext(
DefaultContext defaultContext,
SessionHandle sessionId,
Expand All @@ -102,6 +108,8 @@ private SessionContext(
this.userClassloader = classLoader;
this.sessionState = sessionState;
this.operationManager = operationManager;
this.isStatementSetState = false;
this.statementSetOperations = new ArrayList<>();
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -167,6 +175,33 @@ public OperationExecutor createOperationExecutor(Configuration executionConfig)
return new OperationExecutor(this, executionConfig);
}

// --------------------------------------------------------------------------------------------
// Begin statement set
// --------------------------------------------------------------------------------------------

public boolean isStatementSetState() {
return isStatementSetState;
}

public void enableStatementSet() {
isStatementSetState = true;
}

public void disableStatementSet() {
isStatementSetState = false;
statementSetOperations.clear();
}

public List<ModifyOperation> getStatementSetOperations() {
return Collections.unmodifiableList(new ArrayList<>(statementSetOperations));
}

public void addStatementSetOperation(ModifyOperation operation) {
statementSetOperations.add(operation);
}

// --------------------------------------------------------------------------------------------

/** Close resources, e.g. catalogs. */
public void close() {
operationManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,30 +163,9 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
+ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
}
Operation op = parsedOperations.get(0);
if (op instanceof SetOperation) {
return callSetOperation(tableEnv, handle, (SetOperation) op);
} else if (op instanceof ResetOperation) {
return callResetOperation(handle, (ResetOperation) op);
} else if (op instanceof BeginStatementSetOperation) {
// TODO: support statement set in the FLINK-27837
throw new UnsupportedOperationException();
} else if (op instanceof EndStatementSetOperation) {
// TODO: support statement set in the FLINK-27837
throw new UnsupportedOperationException();
} else if (op instanceof ModifyOperation) {
return callModifyOperations(
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
} else if (op instanceof StatementSetOperation) {
return callModifyOperations(
tableEnv, handle, ((StatementSetOperation) op).getOperations());
} else if (op instanceof QueryOperation) {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else if (op instanceof StopJobOperation) {
return callStopJobOperation(handle, (StopJobOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
return sessionContext.isStatementSetState()
? executeOperationInStatementSetState(tableEnv, handle, op)
: executeOperation(tableEnv, handle, op);
}

public String getCurrentCatalog() {
Expand Down Expand Up @@ -302,16 +281,52 @@ public TableEnvironmentInternal getTableEnvironment() {
return tableEnv;
}

private ResultFetcher executeOperationInStatementSetState(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation operation) {
if (operation instanceof EndStatementSetOperation) {
return callEndStatementSetOperation(tableEnv, handle);
} else if (operation instanceof ModifyOperation) {
sessionContext.addStatementSetOperation((ModifyOperation) operation);
return buildOkResultFetcher(handle);
} else {
throw new SqlExecutionException(
"Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to submit Statement Set.");
}
}

private ResultFetcher executeOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
if (op instanceof SetOperation) {
return callSetOperation(tableEnv, handle, (SetOperation) op);
} else if (op instanceof ResetOperation) {
return callResetOperation(handle, (ResetOperation) op);
} else if (op instanceof BeginStatementSetOperation) {
return callBeginStatementSetOperation(handle);
} else if (op instanceof EndStatementSetOperation) {
throw new SqlExecutionException(
"No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'.");
} else if (op instanceof ModifyOperation) {
return callModifyOperations(
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
} else if (op instanceof StatementSetOperation) {
return callModifyOperations(
tableEnv, handle, ((StatementSetOperation) op).getOperations());
} else if (op instanceof QueryOperation) {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else if (op instanceof StopJobOperation) {
return callStopJobOperation(handle, (StopJobOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
}

private ResultFetcher callSetOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) {
if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
// set a property
sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim());
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
return buildOkResultFetcher(handle);
} else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
// show all properties
Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
Expand Down Expand Up @@ -344,11 +359,26 @@ private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation
// reset all properties
sessionContext.reset();
}
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
return buildOkResultFetcher(handle);
}

private ResultFetcher callBeginStatementSetOperation(OperationHandle handle) {
sessionContext.enableStatementSet();
return buildOkResultFetcher(handle);
}

private ResultFetcher callEndStatementSetOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle) {
// reset the state regardless of whether error occurs while executing the set
List<ModifyOperation> statementSetOperations = sessionContext.getStatementSetOperations();
sessionContext.disableStatementSet();

if (statementSetOperations.isEmpty()) {
// there's no statement in the statement set, skip submitting
return buildOkResultFetcher(handle);
} else {
return callModifyOperations(tableEnv, handle, statementSetOperations);
}
}

private ResultFetcher callModifyOperations(
Expand Down Expand Up @@ -428,7 +458,7 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
}

public ResultFetcher callStopJobOperation(
OperationHandle operationHandle, StopJobOperation stopJobOperation)
OperationHandle handle, StopJobOperation stopJobOperation)
throws SqlExecutionException {
String jobId = stopJobOperation.getJobId();
boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
Expand All @@ -440,7 +470,7 @@ public ResultFetcher callStopJobOperation(
try {
savepoint =
runClusterAction(
operationHandle,
handle,
clusterClient -> {
if (isWithSavepoint) {
// blocking get savepoint path
Expand All @@ -462,7 +492,7 @@ public ResultFetcher callStopJobOperation(
"Could not stop job "
+ stopJobOperation.getJobId()
+ " in session "
+ operationHandle.getIdentifier()
+ handle.getIdentifier()
+ ".",
e);
}
Expand All @@ -473,43 +503,46 @@ public ResultFetcher callStopJobOperation(
});
} catch (Exception e) {
throw new SqlExecutionException(
"Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
"Could not stop job " + jobId + " for operation " + handle + ".", e);
}
if (isWithSavepoint) {
return new ResultFetcher(
operationHandle,
handle,
ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, DataTypes.STRING())),
Collections.singletonList(
GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
} else {
return new ResultFetcher(
operationHandle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
return buildOkResultFetcher(handle);
}
}

private ResultFetcher buildOkResultFetcher(OperationHandle handle) {
return new ResultFetcher(
handle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}

/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
*
* @param operationHandle the specified session handle
* @param handle the specified operation handle
* @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
* @param <ClusterID> type of the cluster id
* @param <Result>> type of the result
* @throws FlinkException if something goes wrong
*/
private <ClusterID, Result> Result runClusterAction(
OperationHandle operationHandle, ClusterAction<ClusterID, Result> clusterAction)
OperationHandle handle, ClusterAction<ClusterID, Result> clusterAction)
throws FlinkException {
final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
final ClusterClientFactory<ClusterID> clusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);

final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
Preconditions.checkNotNull(
clusterId, "No cluster ID found for operation " + operationHandle);
Preconditions.checkNotNull(clusterId, "No cluster ID found for operation " + handle);

try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public void before(@TempDir Path temporaryFolder) throws Exception {
replaceVars.put(
"$VAR_STREAMING_PATH",
Files.createDirectory(temporaryFolder.resolve("streaming")).toFile().getPath());
replaceVars.put(
"$VAR_STREAMING_PATH2",
Files.createDirectory(temporaryFolder.resolve("streaming2")).toFile().getPath());
replaceVars.put(
"$VAR_BATCH_PATH",
Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
Expand Down
Loading

0 comments on commit c5d2534

Please sign in to comment.