-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-21485][sql-client] Simplify the ExecutionContext #15006
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f6a9d79 (Wed Feb 24 13:59:50 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
4cd5563
to
4cc39b3
Compare
840b4cf
to
f5721f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @fsk119 , I left some comments.
import static org.apache.flink.table.api.Expressions.$; | ||
|
||
/** Utils to initialize {@link TableEnvironment} from {@link Environment}. */ | ||
public class LegacyTableEnvironmentInitializer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we introduce TableEnvironmentInitializer
for ddl ? if that, it's better we could introduce an interface for LegacyTableEnvironmentInitializer and TableEnvironmentInitializer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not introduce TableEnvironmentInitializer
for ddl right now. When useing sql file to init, it will just work like put commands in the terminal.
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
Outdated
Show resolved
Hide resolved
for (String key : sessionConfiguration.toMap().keySet()) { | ||
// Don't care the type of the option | ||
ConfigOption<String> keyToDelete = ConfigOptions.key(key).stringType().noDefaultValue(); | ||
sessionConfiguration.removeConfig(keyToDelete); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just re-initialize the sessionConfiguration
based on defaultContext
?
sessionConfiguration = new Configuration(defaultContext.getFlinkConfig())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reasons is the CatalogManager
, FunctionManager
use sessionConfiguration
. If we rebuild a new Configuration
, we can 't modify the values of the option for CatalogManager
, FunctionManager
.
} | ||
|
||
/** Create a new {@link ExecutionContext} without initialization. */ | ||
public ExecutionContext(Environment environment, ExecutionContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to remove this method, because the above method is more generic and the initializeTableEnvironmentFromEnvironment
method will be called in SessionContext#create
.
Actually, we can rename initializeTableEnvironmentFromEnvironment
to initializeSessionState
, because TableEnvironment instance will change with ExecutionContext
...link-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
Outdated
Show resolved
Hide resolved
...link-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
Outdated
Show resolved
Hide resolved
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fsk119 , I left some comments.
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
Outdated
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/DefaultContext.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/table/client/gateway/context/LegacyTableEnvironmentInitializer.java
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
Outdated
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
Outdated
Show resolved
Hide resolved
* multiple queries as long as the session context does not change. This must be thread-safe as it | ||
* might be reused across different query submissions. | ||
*/ | ||
public class ExecutionContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering ExecutionContext
is also not needed anymore. The only thing in ExecutionContext
been used by other components is the classloader
and the tableEnv
. However, IMO, classloader
is a more session variable which should be get from SessionContext
(e.g. we may support ADD JAR
to add jar dependency to the session in the future). Then ExecutionContext
would be just a wrapper of TableEnvironment
.
If this is the case, what do you think about createing a new TableEnvironment
instead of ExecutionContext
in SessionContext
.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO for future work.
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
Outdated
Show resolved
Hide resolved
...l-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
Show resolved
Hide resolved
sessionContext.reset(); | ||
|
||
Assert.assertEquals( | ||
"100", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the influence scope of reset
? Should it reset all configuration to default value, or rollback to initial value defined in sql-client-defaults.yaml
or init.sql
?
If it reset to default value, then the expected value of max-table-result-rows
should be 1000000
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After offline discussion,
- reset should reset value to the value set in the yaml.
- it should reset to the default value if yaml doesn't set the value.
- it should clear the value if it doesn't have default value.
64b620a
to
83446f3
Compare
...l-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/DefaultContext.java
Outdated
Show resolved
Hide resolved
// TODO: use `table.planner` and `execution.runtime-mode` to configure the TableEnvironment | ||
config.addConfiguration(settings.toConfiguration()); | ||
|
||
if (environment.getExecution().isStreamingPlanner()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the new introduced table.planner
and execution.mode
option doesn't take effect here. Currently, the implementation only depends on the SQL Client configuration.
I think we should use table.planner
and execution.mode
to decide how to create TableEnv.
Besides, please add tests for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. But I think we can leave the work to FLINK-21462. It will do the work about Configuration and Yaml compatibility.
.../src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
Outdated
Show resolved
Hide resolved
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
Show resolved
Hide resolved
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
...l-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
Show resolved
Hide resolved
...l-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
Show resolved
Hide resolved
The failed test is because of the network and it passes the test in the test before. |
What is the purpose of the change
This is pr is mainly to refactor the
ExecutionContext
andSessionContext
. We thinkExecutor
is a service to execute the commands from different session,SessionContext
maintains the users configuration andExecutionContext
is the wrapper of theTableEnvironment
. When executeSET
orRESET
commands, the executor will request theSessionContext
to modify the config and rebuild a newTableEnvironment
. However, sometimes we need to rollback the origin configuration. Therefore, we introduce a new context namedDefaultContext
to keep the command line option, YAML environment.Brief change log
DefaultContext
ExecutionContext
Verifying this change
This change is already covered by existing tests, such as (please describe tests).
SessionContextTest
to verify theset
andreset
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation