Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21485][sql-client] Simplify the ExecutionContext #15006

Merged
merged 4 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,21 @@

package org.apache.flink.table.client;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.context.DefaultContext;
import org.apache.flink.table.client.gateway.local.LocalContextUtils;
import org.apache.flink.table.client.gateway.local.LocalExecutor;

import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;

/**
* SQL Client for submitting SQL statements. The client can be executed in two modes: a gateway and
Expand All @@ -69,8 +59,6 @@ public class SqlClient {
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";

public static final String DEFAULT_SESSION_ID = "default";

public SqlClient(boolean isEmbedded, CliOptions options) {
this.isEmbedded = isEmbedded;
this.options = options;
Expand All @@ -79,33 +67,13 @@ public SqlClient(boolean isEmbedded, CliOptions options) {
private void start() {
if (isEmbedded) {
// create local executor with default environment
final List<URL> jars;
if (options.getJars() != null) {
jars = options.getJars();
} else {
jars = Collections.emptyList();
}
final List<URL> libDirs;
if (options.getLibraryDirs() != null) {
libDirs = options.getLibraryDirs();
} else {
libDirs = Collections.emptyList();
}
final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
executor.start();

// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
} else {
context = new SessionContext(options.getSessionId(), sessionEnv);
}
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
final Executor executor = new LocalExecutor(defaultContext);
executor.start();

// Open an new session
String sessionId = executor.openSession(context);
String sessionId = executor.openSession(options.getSessionId());
try {
// add shutdown hook
Runtime.getRuntime()
Expand Down Expand Up @@ -156,32 +124,6 @@ private void openCli(String sessionId, Executor executor) {

// --------------------------------------------------------------------------------------------

private static Environment readSessionEnvironment(URL envUrl) {
// use an empty environment by default
if (envUrl == null) {
System.out.println("No session environment specified.");
return new Environment();
}

System.out.println("Reading session environment from: " + envUrl);
LOG.info("Using session environment file: {}", envUrl);
try {
return Environment.parse(envUrl);
} catch (IOException e) {
throw new SqlClientException(
"Could not read session environment file at: " + envUrl, e);
}
}

private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) {
Map<String, Object> pythonConfig = new HashMap<>(pythonConfiguration.toMap());
Map<String, Object> combinedConfig =
new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
env.setConfiguration(combinedConfig);
}

// --------------------------------------------------------------------------------------------

public static void main(String[] args) {
if (args.length < 1) {
CliOptionsParser.printHelpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public boolean inStreamingMode() {
return properties
.getOptionalString(EXECUTION_TYPE)
.map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
.orElse(false);
.orElse(true);
}

public boolean inBatchMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.types.Row;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

Expand All @@ -32,13 +34,13 @@ public interface Executor {
void start() throws SqlExecutionException;

/**
* Open a new session by using the given {@link SessionContext}.
* Open a new session by using the given session id.
*
* @param session context to create new session.
* @return session identifier to track the session.
* @param sessionId session identifier.
* @return used session identifier to track the session.
* @throws SqlExecutionException if any error happen
*/
String openSession(SessionContext session) throws SqlExecutionException;
String openSession(@Nullable String sessionId) throws SqlExecutionException;

/**
* Close the resources of session for given session id.
Expand Down

This file was deleted.

Loading