Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28628][hive] Introduce operation execution plugin #20247

Merged
merged 3 commits into from
Jul 26, 2022

Conversation

luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Jul 12, 2022

What is the purpose of the change

Introduce an operation execution plugin make exteranl dialect can custom implementation for some operations.

Brief change log

  • Introduce an interface OperationExternalExecutor.java
  • Before use Flink's internal operation execution, try to call method OperationExternalExecutor#executeOperation to delegate external implementation.
  • Rename ParserFactory to DialectFactory for DialectFactory will also to createOperatorExternalExecutor.

Verifying this change

UT

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? N/A

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 12, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@luoyuxia luoyuxia force-pushed the introduce-operation-execution-plugin branch 2 times, most recently from cbc5746 to a3a531d Compare July 12, 2022 09:03
@luoyuxia
Copy link
Contributor Author

@flinkbot run azure

@luoyuxia luoyuxia force-pushed the introduce-operation-execution-plugin branch 2 times, most recently from 7761079 to 4577697 Compare July 19, 2022 02:20
@luoyuxia luoyuxia changed the title introduce operation execution plugin [FLINK-28628][hive] introduce operation execution plugin Jul 21, 2022
@wuchong wuchong changed the title [FLINK-28628][hive] introduce operation execution plugin [FLINK-28628][hive] Introduce operation execution plugin Jul 22, 2022
@luoyuxia luoyuxia force-pushed the introduce-operation-execution-plugin branch from 4577697 to 9fddb38 Compare July 23, 2022 01:27
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @luoyuxia for the contribution. I left some comments.

@@ -81,6 +90,15 @@ public interface TableEnvironmentInternal extends TableEnvironment {
* @param operation The operation to be executed.
* @return the content of the execution result.
*/
TableResultInternal executeOperation(Operation operation);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is quite confusing that what's the difference between this one and executeInternal. Besides, why there are no executeOperation(List<ModifyOperation> operations) ?

Couldn't we just implement the delegation in executeInternal?

Copy link
Contributor Author

@luoyuxia luoyuxia Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just implement the delegation in executeInternal. The reason I add executeOperation is to make the whole logic clear, like first try to use external implementation , and then executeInternal which is Flink's implementation.
I'm fine to put the delegation in executeInternal, which won't introduce new method for this interface.

The reason Why no executeOperation(List<ModifyOperation> operations) is I don't want to expose much to user and I think it's no nesscessary. Now, this method is mainly used by StatementSetOperation. And if extern dialect wants to custom the implementation for the operation containing multi-insert, extern dialect can return a new operation like HiveStatementSetOperation and then still delegate it to executeOperation(Operation operation).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds we can encapsulate the logic in the internal implementation instead of exposing them on the interface. From the perspective of users, they shouldn't care about the difference between executeInternal() and executeOperation(). It's very easy to call the wrong method for the users who use the TableEnvironmentInternl.


/** Creates a new parser. */
Parser create(Context context);

default OperationExternalExecutor createOperatorExternalExecutor(Context context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

External might be misleading here, what about ExtendedOperationExecutor?

}

/** Default implementation for {@link OperationExternalExecutor}. */
class DefaultOperationExternalExecutor implements OperationExternalExecutor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation can be EmptyOperationExecutor with comment: a default implementation of ExtendedOperationExecutor that doesn't extend any operation behavior but forward all operations to the Flink planner.

@luoyuxia luoyuxia force-pushed the introduce-operation-execution-plugin branch from 9fddb38 to 2e09d74 Compare July 25, 2022 06:47
@wuchong
Copy link
Member

wuchong commented Jul 25, 2022

Looks good to me. But please fix the compile problem.

PS: maybe you didn't configure the automatic code format in IDE. See https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#code-formatting

@luoyuxia luoyuxia force-pushed the introduce-operation-execution-plugin branch from 2e09d74 to 97c4a9b Compare July 25, 2022 09:51
@wuchong
Copy link
Member

wuchong commented Jul 25, 2022

TableEnvironmentTest.testManagedTable are failed. Please append commits to fix the test.

@luoyuxia
Copy link
Contributor Author

@wuchong The CI is passed now. Could you please help merge so that I can move on the other issues.

@wuchong wuchong merged commit c7af66b into apache:master Jul 26, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants