Skip to content

Commit

Permalink
[FLINK-29742][sql-gateway] Support completing statement in SqlGateway…
Browse files Browse the repository at this point in the history
…Service

This closes apache#21141
  • Loading branch information
yuzelin committed Dec 12, 2022
1 parent 5f924bc commit dc3ea82
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -307,4 +308,15 @@ FunctionDefinition getFunctionDefinition(
* @return Returns gateway info.
*/
GatewayInfo getGatewayInfo();

/**
* Returns a list of completion hints for the given statement at the given position.
*
* @param sessionHandle handle to identify the session.
* @param statement sql statement to be completed.
* @param position position of where need completion hints.
* @return completion hints.
*/
List<String> completeStatement(SessionHandle sessionHandle, String statement, int position)
throws SqlGatewayException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -179,6 +180,13 @@ public GatewayInfo getGatewayInfo() {
throw new UnsupportedOperationException();
}

@Override
public List<String> completeStatement(
SessionHandle sessionHandle, String statement, int position)
throws SqlGatewayException {
throw new UnsupportedOperationException();
}

@Override
public ResolvedCatalogBaseTable<?> getTable(
SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
Expand All @@ -45,9 +46,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

/** The implementation of the {@link SqlGatewayService} interface. */
public class SqlGatewayServiceImpl implements SqlGatewayService {
Expand Down Expand Up @@ -337,6 +340,32 @@ public GatewayInfo getGatewayInfo() {
return GatewayInfo.INSTANCE;
}

@Override
public List<String> completeStatement(
SessionHandle sessionHandle, String statement, int position)
throws SqlGatewayException {
try {
OperationManager operationManager = getSession(sessionHandle).getOperationManager();
OperationHandle operationHandle =
operationManager.submitOperation(
() ->
getSession(sessionHandle)
.createExecutor()
.getCompletionHints(statement, position));
operationManager.awaitOperationTermination(operationHandle);

ResultSet resultSet =
fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE);
return resultSet.getData().stream()
.map(data -> data.getString(0))
.map(StringData::toString)
.collect(Collectors.toList());
} catch (Throwable t) {
LOG.error("Failed to get statement completion hints.", t);
throw new SqlGatewayException("Failed to get statement completion hints.", t);
}
}

// --------------------------------------------------------------------------------------------

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_HINTS;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
Expand Down Expand Up @@ -257,6 +259,19 @@ public FunctionDefinition getFunctionDefinition(UnresolvedIdentifier identifier)
.getDefinition();
}

public ResultSet getCompletionHints(String statement, int position) {
return new ResultSet(
ResultSet.ResultType.EOS,
null,
ResolvedSchema.of(Column.physical(COMPLETION_HINTS, DataTypes.STRING())),
Arrays.stream(
getTableEnvironment()
.getParser()
.getCompletionHints(statement, position))
.map(hint -> GenericRowData.of(StringData.fromString(hint)))
.collect(Collectors.toList()));
}

// --------------------------------------------------------------------------------------------

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public class Constants {
public static final String JOB_ID = "job id";
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String COMPLETION_HINTS = "hints";
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,52 @@ public void testListUserDefinedFunctions() {
"table_func0"))));
}

@Test
public void testCompleteStatement() throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);

String createTable1 =
"CREATE TABLE Table1 (\n"
+ " IntegerField1 INT,\n"
+ " StringField1 STRING,\n"
+ " TimestampField1 TIMESTAMP(3)\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen'\n"
+ ")\n";
String createTable2 =
"CREATE TABLE Table2 (\n"
+ " BooleanField BOOLEAN,\n"
+ " StringField2 STRING,\n"
+ " TimestampField2 TIMESTAMP\n"
+ ") WITH (\n"
+ " 'connector' = 'blackhole'\n"
+ ")\n";

service.getSession(sessionHandle)
.createExecutor()
.getTableEnvironment()
.executeSql(createTable1);
service.getSession(sessionHandle)
.createExecutor()
.getTableEnvironment()
.executeSql(createTable2);

validateCompletionHints(
sessionHandle,
"SELECT * FROM Ta",
Arrays.asList(
"default_catalog.default_database.Table1",
"default_catalog.default_database.Table2"));

validateCompletionHints(
sessionHandle, "SELECT * FROM Table1 WH", Collections.singletonList("WHERE"));

validateCompletionHints(
sessionHandle,
"SELECT * FROM Table1 WHERE Inte",
Collections.singletonList("IntegerField1"));
}

@Test
public void testGetTable() {
SessionHandle sessionHandle = createInitializedSession();
Expand Down Expand Up @@ -1014,4 +1060,12 @@ private void validateStatementResult(
.collectInternal()))
.isEqualTo(expected);
}

private void validateCompletionHints(
SessionHandle sessionHandle,
String incompleteSql,
List<String> expectedCompletionHints) {
assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length()))
.isEqualTo(expectedCompletionHints);
}
}

0 comments on commit dc3ea82

Please sign in to comment.