-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-28628][hive] Introduce operation execution plugin #20247
[FLINK-28628][hive] Introduce operation execution plugin #20247
Conversation
cbc5746
to
a3a531d
Compare
@flinkbot run azure |
7761079
to
4577697
Compare
4577697
to
9fddb38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @luoyuxia for the contribution. I left some comments.
...ble-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
Outdated
Show resolved
Hide resolved
@@ -81,6 +90,15 @@ public interface TableEnvironmentInternal extends TableEnvironment { | |||
* @param operation The operation to be executed. | |||
* @return the content of the execution result. | |||
*/ | |||
TableResultInternal executeOperation(Operation operation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is quite confusing that what's the difference between this one and executeInternal
. Besides, why there are no executeOperation(List<ModifyOperation> operations)
?
Couldn't we just implement the delegation in executeInternal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just implement the delegation in executeInternal
. The reason I add executeOperation
is to make the whole logic clear, like first try to use external implementation , and then executeInternal
which is Flink's implementation.
I'm fine to put the delegation in executeInternal
, which won't introduce new method for this interface.
The reason Why no executeOperation(List<ModifyOperation> operations)
is I don't want to expose much to user and I think it's no nesscessary. Now, this method is mainly used by StatementSetOperation
. And if extern dialect wants to custom the implementation for the operation containing multi-insert, extern dialect can return a new operation like HiveStatementSetOperation
and then still delegate it to executeOperation(Operation operation)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds we can encapsulate the logic in the internal implementation instead of exposing them on the interface. From the perspective of users, they shouldn't care about the difference between executeInternal()
and executeOperation()
. It's very easy to call the wrong method for the users who use the TableEnvironmentInternl
.
|
||
/** Creates a new parser. */ | ||
Parser create(Context context); | ||
|
||
default OperationExternalExecutor createOperatorExternalExecutor(Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
External
might be misleading here, what about ExtendedOperationExecutor
?
} | ||
|
||
/** Default implementation for {@link OperationExternalExecutor}. */ | ||
class DefaultOperationExternalExecutor implements OperationExternalExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default implementation can be EmptyOperationExecutor
with comment: a default implementation of ExtendedOperationExecutor
that doesn't extend any operation behavior but forward all operations to the Flink planner.
9fddb38
to
2e09d74
Compare
Looks good to me. But please fix the compile problem. PS: maybe you didn't configure the automatic code format in IDE. See https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#code-formatting |
2e09d74
to
97c4a9b
Compare
|
@wuchong The CI is passed now. Could you please help merge so that I can move on the other issues. |
What is the purpose of the change
Introduce an operation execution plugin make exteranl dialect can custom implementation for some operations.
Brief change log
OperationExternalExecutor#executeOperation
to delegate external implementation.ParserFactory
toDialectFactory
forDialectFactory
will also tocreateOperatorExternalExecutor
.Verifying this change
UT
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation