Skip to content

Commit

Permalink
[FLINK-17786][sql-client] Fix can not switch dialect in SQL CLI
Browse files Browse the repository at this point in the history
Remove dialect from ExecutionEntry

This closes apache#12217
  • Loading branch information
lirui-apache authored May 19, 2020
1 parent 4cfd23a commit ce0f334
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.client.config.ConfigUtil;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.descriptors.DescriptorProperties;
Expand Down Expand Up @@ -111,8 +110,6 @@ public class ExecutionEntry extends ConfigEntry {

public static final String EXECUTION_CURRENT_DATABASE = "current-database";

public static final String EXECUTION_SQL_DIALECT = "dialect";

private ExecutionEntry(DescriptorProperties properties) {
super(properties);
}
Expand Down Expand Up @@ -157,12 +154,6 @@ protected void validate(DescriptorProperties properties) {
properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, true, 1);
properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
properties.validateEnumValues(EXECUTION_SQL_DIALECT,
true,
Arrays.asList(
SqlDialect.DEFAULT.name().toLowerCase(),
SqlDialect.HIVE.name().toLowerCase()
));
}

public EnvironmentSettings getEnvironmentSettings() {
Expand Down Expand Up @@ -339,12 +330,6 @@ public boolean isTableauMode() {
.orElse(false);
}

public SqlDialect getSqlDialect() {
return properties.getOptionalString(EXECUTION_SQL_DIALECT)
.map(name -> SqlDialect.valueOf(name.toUpperCase()))
.orElse(SqlDialect.DEFAULT);
}

public Map<String, String> asTopLevelMap() {
return properties.asPrefixedMap(EXECUTION_ENTRY + '.');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ private void initializeTableEnvironment(@Nullable SessionState sessionState) {
config.addConfiguration(flinkConfig);
environment.getConfiguration().asMap().forEach((k, v) ->
config.getConfiguration().setString(k, v));
config.setSqlDialect(environment.getExecution().getSqlDialect());

if (noInheritedState) {
//--------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.client.python.PythonFunctionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
Expand Down Expand Up @@ -74,7 +73,6 @@ public class ExecutionContextTest {
public static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-client-configuration.yaml";
private static final String DIALECT_ENVIRONMENT_FILE = "test-sql-client-dialect.yaml";
private static final String FUNCTION_ENVIRONMENT_FILE = "test-sql-client-python-functions.yaml";

@Test
Expand Down Expand Up @@ -307,21 +305,6 @@ public void testInitCatalogs() throws Exception{
Collections.singletonList(new DefaultCLI(flinkConfig))).build();
}

@Test
public void testSQLDialect() throws Exception {
ExecutionContext<?> context = createDefaultExecutionContext();
assertEquals(SqlDialect.DEFAULT, context.getTableEnvironment().getConfig().getSqlDialect());

Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_DIALECT", "default");
context = createExecutionContext(DIALECT_ENVIRONMENT_FILE, replaceVars);
assertEquals(SqlDialect.DEFAULT, context.getTableEnvironment().getConfig().getSqlDialect());

replaceVars.put("$VAR_DIALECT", "hive");
context = createExecutionContext(DIALECT_ENVIRONMENT_FILE, replaceVars);
assertEquals(SqlDialect.HIVE, context.getTableEnvironment().getConfig().getSqlDialect());
}

@SuppressWarnings("unchecked")
private <T> ExecutionContext<T> createExecutionContext(String file, Map<String, String> replaceVars) throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
Expand Down Expand Up @@ -107,6 +109,7 @@ public static List<String> planner() {
}

private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final String DIALECT_ENVIRONMENT_FILE = "test-sql-client-dialect.yaml";

private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
Expand Down Expand Up @@ -1304,6 +1307,32 @@ public void testAlterFunction() throws Exception {
}
}

@Test
public void testSQLDialect() throws Exception {
LocalExecutor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
// by default to use DEFAULT dialect
assertEquals(SqlDialect.DEFAULT, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
// test switching dialect
executor.setSessionProperty(sessionId, TableConfigOptions.TABLE_SQL_DIALECT.key(), "hive");
assertEquals(SqlDialect.HIVE, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
executor.closeSession(sessionId);

Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_DIALECT", "default");
executor = createModifiedExecutor(DIALECT_ENVIRONMENT_FILE, clusterClient, replaceVars);
sessionId = executor.openSession(session);
assertEquals(SqlDialect.DEFAULT, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
executor.closeSession(sessionId);

replaceVars.put("$VAR_DIALECT", "hive");
executor = createModifiedExecutor(DIALECT_ENVIRONMENT_FILE, clusterClient, replaceVars);
sessionId = executor.openSession(session);
assertEquals(SqlDialect.HIVE, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
executor.closeSession(sessionId);
}

private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ execution:
planner: blink
type: batch
result-mode: table
dialect: "$VAR_DIALECT"

configuration:
table.exec.sort.default-limit: 100
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
table.optimizer.join-reorder-enabled: true
table.sql-dialect: "$VAR_DIALECT"

0 comments on commit ce0f334

Please sign in to comment.