Skip to content

Commit

Permalink
[FLINK-28360][sql-client] Support stop job statement in SQL client (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
link3280 committed Oct 10, 2022
1 parent bca164f commit 7bce485
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
Expand Down Expand Up @@ -468,6 +469,9 @@ private void callOperation(Operation operation, ExecutionMode mode) {
} else if (operation instanceof CreateTableASOperation) {
// CTAS
callInsert((CreateTableASOperation) operation);
} else if (operation instanceof StopJobOperation) {
// STOP JOB
callStopJob((StopJobOperation) operation);
} else {
// fallback to default implementation
executeOperation(operation);
Expand Down Expand Up @@ -635,6 +639,23 @@ private void callEndStatementSet() {
}
}

private void callStopJob(StopJobOperation stopJobOperation) {
Optional<String> savepoint =
executor.stopJob(
sessionId,
stopJobOperation.getJobId(),
stopJobOperation.isWithSavepoint(),
stopJobOperation.isWithDrain());
if (stopJobOperation.isWithSavepoint()) {
Preconditions.checkState(savepoint.isPresent());
printInfo(
String.format(
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT, savepoint.get()));
} else {
printInfo(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
}
}

private void executeOperation(Operation operation) {
TableResultInternal result = executor.executeOperation(sessionId, operation);
if (TABLE_RESULT_OK == result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public AttributedString build() {
public static final String MESSAGE_REMOVE_JAR_STATEMENT =
"The specified jar is removed from session classloader.";

public static final String MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT =
"The specified job is stopped with savepoint %s.";

public static final String MESSAGE_STOP_JOB_STATEMENT = "The specified job is stopped.";

// --------------------------------------------------------------------------------------------

public static final String RESULT_TITLE = "SQL Query Result";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** A gateway for communicating with Flink and other external systems. */
public interface Executor {
Expand Down Expand Up @@ -143,4 +144,9 @@ TypedResult<Integer> snapshotResult(String sessionId, String resultId, int pageS

/** Remove the JAR resource from the classloader with specified session. */
void removeJar(String sessionId, String jarPath);

/** Stops a job in the specified session. */
Optional<String> stopJob(
String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain)
throws SqlExecutionException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.util.MutableURLClassLoader;
import org.apache.flink.util.TemporaryClassLoaderContext;

import java.lang.reflect.Method;
import java.util.function.Supplier;

import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;

Expand Down Expand Up @@ -167,4 +169,13 @@ private static Executor lookupExecutor(
e);
}
}

/**
* Executes the given supplier using the execution context's classloader as thread classloader.
*/
public <R> R wrapClassLoader(Supplier<R> supplier) {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
return supplier.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@
package org.apache.flink.table.client.gateway.local;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
Expand All @@ -38,17 +47,22 @@
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -69,11 +83,14 @@ public class LocalExecutor implements Executor {
private final ResultStore resultStore;
private final DefaultContext defaultContext;

private final ClusterClientServiceLoader clusterClientServiceLoader;

/** Creates a local executor for submitting table programs and retrieving results. */
public LocalExecutor(DefaultContext defaultContext) {
this.contextMap = new ConcurrentHashMap<>();
this.resultStore = new ResultStore();
this.defaultContext = defaultContext;
this.clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
}

@Override
Expand Down Expand Up @@ -307,4 +324,99 @@ public void removeJar(String sessionId, String jarUrl) {
final SessionContext context = getSessionContext(sessionId);
context.removeJar(jarUrl);
}

@Override
public Optional<String> stopJob(
String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain)
throws SqlExecutionException {
Duration clientTimeout = getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
try {
return runClusterAction(
sessionId,
clusterClient -> {
if (isWithSavepoint) {
// blocking get savepoint path
try {
String savepoint =
clusterClient
.stopWithSavepoint(
JobID.fromHexString(jobId),
isWithDrain,
null,
SavepointFormatType.DEFAULT)
.get(
clientTimeout.toMillis(),
TimeUnit.MILLISECONDS);
return Optional.of(savepoint);
} catch (Exception e) {
throw new FlinkException(
"Could not stop job "
+ jobId
+ " in session "
+ sessionId
+ ".",
e);
}
} else {
clusterClient.cancel(JobID.fromHexString(jobId));
return Optional.empty();
}
});
} catch (Exception e) {
throw new SqlExecutionException(
"Could not stop job " + jobId + " in session " + sessionId + ".", e);
}
}

/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
*
* @param sessionId the specified session ID
* @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
* @param <ClusterID> type of the cluster id
* @param <Result>> type of the result
* @throws FlinkException if something goes wrong
*/
private <ClusterID, Result> Result runClusterAction(
String sessionId, ClusterAction<ClusterID, Result> clusterAction)
throws FlinkException {
final SessionContext context = getSessionContext(sessionId);
final Configuration configuration = (Configuration) context.getReadableConfig();
final ClusterClientFactory<ClusterID> clusterClientFactory =
context.getExecutionContext()
.wrapClassLoader(
() ->
clusterClientServiceLoader.getClusterClientFactory(
configuration));

final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
Preconditions.checkNotNull(clusterId, "No cluster ID found for session " + sessionId);

try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration);
final ClusterClient<ClusterID> clusterClient =
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
return clusterAction.runAction(clusterClient);
}
}

/**
* Internal interface to encapsulate cluster actions which are executed via the {@link
* ClusterClient}.
*
* @param <ClusterID> type of the cluster id
* @param <Result>> type of the result
*/
@FunctionalInterface
private interface ClusterAction<ClusterID, Result> {

/**
* Run the cluster action with the given {@link ClusterClient}.
*
* @param clusterClient to run the cluster action against
* @throws FlinkException if something goes wrong
*/
Result runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
Expand Down Expand Up @@ -356,6 +359,58 @@ public void testCancelExecutionInteractiveMode() throws Exception {
}
}

@Test(timeout = 10000)
public void testStopJob() throws Exception {
final MockExecutor mockExecutor = new MockExecutor();
mockExecutor.isSync = false;

String sessionId = mockExecutor.openSession("test-session");
OutputStream outputStream = new ByteArrayOutputStream(256);
try (CliClient client =
new CliClient(
() -> TerminalUtils.createDumbTerminal(outputStream),
sessionId,
mockExecutor,
historyTempFile(),
null)) {
client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
String dmlResult = outputStream.toString();
String jobId = extractJobId(dmlResult);
client.executeInNonInteractiveMode("STOP JOB '" + jobId + "'");
String stopResult = outputStream.toString();
assertThat(stopResult).contains(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
}
}

@Test(timeout = 10000)
public void testStopJobWithSavepoint() throws Exception {
final MockExecutor mockExecutor = new MockExecutor();
mockExecutor.isSync = false;
final String mockSavepoint = "/my/savepoint/path";
mockExecutor.savepoint = mockSavepoint;

String sessionId = mockExecutor.openSession("test-session");
OutputStream outputStream = new ByteArrayOutputStream(256);
try (CliClient client =
new CliClient(
() -> TerminalUtils.createDumbTerminal(outputStream),
sessionId,
mockExecutor,
historyTempFile(),
null)) {
client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
String dmlResult = outputStream.toString();
String jobId = extractJobId(dmlResult);
client.executeInNonInteractiveMode("STOP JOB '" + jobId + "' WITH SAVEPOINT");
String stopResult = outputStream.toString();
assertThat(stopResult)
.contains(
String.format(
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT,
mockSavepoint));
}
}

// --------------------------------------------------------------------------------------------

private void verifyUpdateSubmission(
Expand Down Expand Up @@ -419,11 +474,21 @@ private String executeSqlFromContent(MockExecutor executor, String content) thro
return outputStream.toString();
}

private String extractJobId(String result) {
Pattern pattern = Pattern.compile("[\\s\\S]*Job ID: (.*)[\\s\\S]*");
Matcher matcher = pattern.matcher(result);
if (!matcher.matches()) {
throw new IllegalStateException("No job ID found in string: " + result);
}
return matcher.group(1);
}

// --------------------------------------------------------------------------------------------

private static class MockExecutor implements Executor {

public boolean failExecution;
public String savepoint;

public volatile boolean isSync = false;
public volatile boolean isAwait = false;
Expand Down Expand Up @@ -595,5 +660,16 @@ public void cancelQuery(String sessionId, String resultId) throws SqlExecutionEx
public void removeJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public Optional<String> stopJob(
String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain)
throws SqlExecutionException {
if (isWithSavepoint) {
return Optional.of(savepoint);
} else {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -245,6 +246,13 @@ public void cancelQuery(String sessionId, String resultId) throws SqlExecutionEx
public void removeJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}

@Override
public Optional<String> stopJob(
String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain)
throws SqlExecutionException {
throw new UnsupportedOperationException("Not implemented.");
}
}

private static final class TestingCliResultView implements Runnable {
Expand Down
Loading

0 comments on commit 7bce485

Please sign in to comment.