Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-15487][table] Update code generation for new type inference #10960

Closed
wants to merge 7 commits into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Jan 29, 2020

What is the purpose of the change

This updates the code generation for the new type inference and thus completes FLINK-15487. Scala function work with the types supported by the planner. Tests added in this PR only test basic behavior. We will need more tests per data type. But this is a follow up issue.

Brief change log

  • Update the code generation
  • Fix a couple of bugs and shortcomings

Verifying this change

FunctionITCase tests the implementation.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ef2e385 (Wed Jan 29 14:12:48 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@twalthr
Copy link
Contributor Author

twalthr commented Jan 29, 2020

CC @JingsongLi

@dawidwys dawidwys self-assigned this Jan 29, 2020
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 29, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @twalthr , left some comments.

/**
* Returns a term for representing the given class in Java code.
*/
def typeTerm(clazz: Class[_]): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just getCanonicalName?

Copy link
Contributor Author

@twalthr twalthr Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canonical name can be null. Class.getName always returns a value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am also in favor of @JingsongLi suggestion. As per the javadoc of getCanonicalName I think if it gives us null, we cannot use the term anyway.

We could add a check here that throws exception that given class does not have a canonical representation and thus we cannot use it for code generation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update it but in general if we want to relax the UDF constraints in the future, we might need to support anonymous classes in our utilities as well.

}
// extract null term from result term
if (sourceType.getConversionClass.isPrimitive) {
generateNonNullField(sourceType.getLogicalType, internalResultTerm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace sourceType.getLogicalType to reused field fromDataTypeToLogicalType(sourceType)?

: GeneratedExpression = {
// convert external source type to internal format
val internalResultTerm = if (isInternalClass(sourceType)) {
s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this cast because UDF can return Object and DataType with internal conversion class...
Sometimes there is some gap between the conversion class and real java function return class. It is safe to add this cast, and I think JVM can optimize this cast to not affect performance.

I think we can add comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment. But actually the gap should be filled by the converters. I think the core problem is that int and Integer are handled the same in the converters. Even though the latter can support null und needs unboxing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast is actually not necessary anymore because of the cast in BridgingSqlFunctionCallGen. I remove it for now.

} else {
val eTerm = boxedTypeTermForExternalType(t)
val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(targetType))
val eTerm = boxedTypeTermForExternalType(targetType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, consider a hotfix to modify the boxedTypeTermForExternalType, because getConversionClass can't be null now.

* @return internal unboxed field representation
*/
def generateInputFieldUnboxing(
ctx: CodeGeneratorContext,
fieldType: LogicalType,
fieldTerm: String): GeneratedExpression = {
fieldTerm: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldTerm -> inputTerm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method should be refactored in the future. In Flink planner, this method was intended to perform genToInternalIfNeeded. Now the concepts are mixed up.

* @return internal unboxed field representation
*/
def generateInputFieldUnboxing(
ctx: CodeGeneratorContext,
fieldType: LogicalType,
fieldTerm: String): GeneratedExpression = {
fieldTerm: String,
unboxingTerm: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unboxingTerm -> outputTerm or outputUnboxingTerm?

val externalResultCasting = if (externalResultClass == externalResultClassBoxed) {
s"($externalResultTypeTerm)"
} else {
s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the method return class too? If it return primitive class, we don't need add this cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot determine the return class of the method at this point. The JVM will pick the right method to call with the given signature. The JVM should be smart enough to remove the cast here.

}
// extract null term from result term
if (sourceClass.isPrimitive) {
generateNonNullField(sourceType, internalResultTerm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some concerns that the user has returned primitive conversion class, but the real java method return non-primitive which could be null.
What kind of errors will we give users at this time? Maybe we can add a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users will get a null pointer exception. Which is expected if people override the default type inference and implement advanced features. Usually, this exception should not happen as people will use the extraction + annotations.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really impressive work. The usage of ScalarFunctions looks splendid!

@@ -305,8 +305,12 @@ private DataType extractDataTypeOrError(DataTypeTemplate template, List<Type> ty
DataTypeTemplate template,
List<Type> typeHierarchy,
Type type) {
// byte arrays have higher priority than regular arrays
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about a comment like: prefer VARBINARY/BYTES() over ARRAY(TINYINT) for bytes[] instead?

} else {
return type;
return FunctionArgumentTemplate.of(type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: two separate methods?

// check for input group before start extracting a data type
return tryExtractInputGroup(method, i)
		// extract a concrete data type
   	.orElseGet(() -> extractUsingExtractor(typeFactory, function, method, i));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I also fixed two other bugs on the way.

/**
* Returns a term for representing the given class in Java code.
*/
def typeTerm(clazz: Class[_]): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am also in favor of @JingsongLi suggestion. As per the javadoc of getCanonicalName I think if it gives us null, we cannot use the term anyway.

We could add a check here that throws exception that given class does not have a canonical representation and thus we cannot use it for code generation.

} else {
t.getConversionClass.getCanonicalName
}
t.getConversionClass.getCanonicalName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use typeTerm(t.getConversionClass)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why is the method called boxed...? We don't perform any boxing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be legacy. I removed it entirely.

val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
if (isConverterIdentity(t)) {
s"($iTerm) $term"
def genToExternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For future reference. I feel that we can unify the genToExternal/Internal methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same feeling. But this should be a follow up task. Currently, it is still used at a couple of places.

fieldTerm: String): GeneratedExpression = {
inputType: LogicalType,
inputTerm: String,
inputUnboxingTerm: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this change, but do we really need that parameter? Shouldn't we only ever check for null and assign value from the same input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the old logic we were performing .toInternal() conversion twice. One time for the null check and one time for the assignment. This improves the runtime code.

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(TypeStrategies.argument(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we maybe change the inputTypeStrategy to be also required instead of a WILDCARD? I think we should not assume anything for users that end up defining their own type inference strategies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input validation is rather optional. We are mostly interested in the return type which is why this is the only mandatory attribute.

e,
hasMessage(
equalTo(
"Could not find an implementation method that matches the following " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shall we print the class name of the udf to which the function resolved to and available eval methods?

I think users would appreciate that.

@twalthr
Copy link
Contributor Author

twalthr commented Feb 4, 2020

Thanks for the review @JingsongLi and @dawidwys. I hope I could address most feedback. I will merge this once Travis gives green light. I will open a follow-up issue for more extensive tests for all data types.

@twalthr
Copy link
Contributor Author

twalthr commented Feb 4, 2020

@flinkbot run travis

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @twalthr , code generation looks good to me.

@twalthr twalthr closed this in 5e6e851 Feb 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants