Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService #21292

Merged
merged 10 commits into from
Dec 18, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@

import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_HINTS;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -474,11 +475,19 @@ public ResultFetcher callStopJobOperation(
throw new SqlExecutionException(
"Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
}
return new ResultFetcher(
operationHandle,
ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
Collections.singletonList(
GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
if (isWithSavepoint) {
return new ResultFetcher(
operationHandle,
ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, DataTypes.STRING())),
Collections.singletonList(
GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
} else {
return new ResultFetcher(
operationHandle,
TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
CollectionUtil.iteratorToList(
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public class Constants {
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String COMPLETION_HINTS = "hints";

public static final String SAVEPOINT_PATH = "savepoint_path";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.gateway.service;

import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
Expand Down Expand Up @@ -58,8 +59,10 @@
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -125,8 +128,6 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);

@TempDir File tmpDir;

private static SessionManager sessionManager;
private static SqlGatewayServiceImpl service;

Expand Down Expand Up @@ -311,7 +312,7 @@ public void testGetOperationFinishedAndFetchResults() throws Exception {
"Failed to wait operation finish.");

List<RowData> expectedData = getDefaultResultSet().getData();
List<RowData> actualData = fetchAllResultRows(sessionHandle, operationHandle);
List<RowData> actualData = fetchAllResults(sessionHandle, operationHandle);
assertThat(actualData).isEqualTo(expectedData);

service.closeOperation(sessionHandle, operationHandle);
Expand Down Expand Up @@ -391,7 +392,7 @@ public void testExecuteSqlWithConfig() {
-1,
Configuration.fromMap(Collections.singletonMap(key, value)));

List<RowData> settings = fetchAllResultRows(sessionHandle, operationHandle);
List<RowData> settings = fetchAllResults(sessionHandle, operationHandle);

assertThat(settings)
.contains(
Expand All @@ -401,8 +402,12 @@ public void testExecuteSqlWithConfig() {

@ParameterizedTest
@CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint)
throws InterruptedException {
public void testStopJobStatementWithSavepoint(
String option,
boolean hasSavepoint,
@InjectClusterClient RestClusterClient<?> restClusterClient,
@TempDir File tmpDir)
throws Exception {
Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
File savepointDir = new File(tmpDir, "savepoints");
Expand All @@ -422,18 +427,17 @@ public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoin
OperationHandle insertOperationHandle =
service.executeStatement(sessionHandle, insertSql, -1, configuration);

List<RowData> results = fetchAllResultRows(sessionHandle, insertOperationHandle);
List<RowData> results = fetchAllResults(sessionHandle, insertOperationHandle);
assertThat(results.size()).isEqualTo(1);
String jobId = results.get(0).getString(0).toString();

// wait till the job turns into running status
Thread.sleep(2_000L);
TestUtils.waitUntilJobIsRunning(restClusterClient);
link3280 marked this conversation as resolved.
Show resolved Hide resolved

String stopSql = String.format(stopSqlTemplate, jobId, option);
OperationHandle stopOperationHandle =
service.executeStatement(sessionHandle, stopSql, -1, configuration);

List<RowData> stopResults = fetchAllResultRows(sessionHandle, stopOperationHandle);
List<RowData> stopResults = fetchAllResults(sessionHandle, stopOperationHandle);
assertThat(stopResults.size()).isEqualTo(1);
if (hasSavepoint) {
String savepoint = stopResults.get(0).getString(0).toString();
Expand Down Expand Up @@ -1119,7 +1123,7 @@ private void validateCompletionHints(
.isEqualTo(expectedCompletionHints);
}

private List<RowData> fetchAllResultRows(
private List<RowData> fetchAllResults(
SessionHandle sessionHandle, OperationHandle operationHandle) {
Long token = 0L;
List<RowData> results = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand All @@ -41,9 +42,13 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_DIR_PREFIX;
import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME;
Expand Down Expand Up @@ -181,4 +186,24 @@ public static void waitUntilJobCanceled(JobID jobId, ClusterClient<?> client)
Thread.sleep(50);
}
}

/**
* Wait util at least one job turns into RUNNING status in the cluster. Applicable for single
* job scenarios.
*
* @param client ClusterClient which could be {@link
* org.apache.flink.test.junit5.InjectClusterClient}.
*/
public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
Collection<JobStatusMessage> statusMessages = client.listJobs().get();
link3280 marked this conversation as resolved.
Show resolved Hide resolved
List<JobID> runningJobs = new ArrayList<>();
while (runningJobs.isEmpty()) {
Thread.sleep(50);
runningJobs =
statusMessages.stream()
.filter(status -> !status.getJobState().equals(JobStatus.RUNNING))
.map(JobStatusMessage::getJobId)
.collect(Collectors.toList());
}
}
}