-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-15487][table] Update code generation for new type inference #10960
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ef2e385 (Wed Jan 29 14:12:48 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CC @JingsongLi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @twalthr , left some comments.
/** | ||
* Returns a term for representing the given class in Java code. | ||
*/ | ||
def typeTerm(clazz: Class[_]): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just getCanonicalName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Canonical name can be null. Class.getName always returns a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am also in favor of @JingsongLi suggestion. As per the javadoc of getCanonicalName
I think if it gives us null, we cannot use the term anyway.
We could add a check here that throws exception that given class does not have a canonical representation and thus we cannot use it for code generation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update it but in general if we want to relax the UDF constraints in the future, we might need to support anonymous classes in our utilities as well.
} | ||
// extract null term from result term | ||
if (sourceType.getConversionClass.isPrimitive) { | ||
generateNonNullField(sourceType.getLogicalType, internalResultTerm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace sourceType.getLogicalType
to reused field fromDataTypeToLogicalType(sourceType)
?
: GeneratedExpression = { | ||
// convert external source type to internal format | ||
val internalResultTerm = if (isInternalClass(sourceType)) { | ||
s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this cast because UDF can return Object
and DataType with internal conversion class...
Sometimes there is some gap between the conversion class and real java function return class. It is safe to add this cast, and I think JVM can optimize this cast to not affect performance.
I think we can add comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment. But actually the gap should be filled by the converters. I think the core problem is that int
and Integer
are handled the same in the converters. Even though the latter can support null und needs unboxing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cast is actually not necessary anymore because of the cast in BridgingSqlFunctionCallGen
. I remove it for now.
} else { | ||
val eTerm = boxedTypeTermForExternalType(t) | ||
val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(targetType)) | ||
val eTerm = boxedTypeTermForExternalType(targetType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, consider a hotfix to modify the boxedTypeTermForExternalType
, because getConversionClass
can't be null now.
* @return internal unboxed field representation | ||
*/ | ||
def generateInputFieldUnboxing( | ||
ctx: CodeGeneratorContext, | ||
fieldType: LogicalType, | ||
fieldTerm: String): GeneratedExpression = { | ||
fieldTerm: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fieldTerm
-> inputTerm
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the method should be refactored in the future. In Flink planner, this method was intended to perform genToInternalIfNeeded
. Now the concepts are mixed up.
* @return internal unboxed field representation | ||
*/ | ||
def generateInputFieldUnboxing( | ||
ctx: CodeGeneratorContext, | ||
fieldType: LogicalType, | ||
fieldTerm: String): GeneratedExpression = { | ||
fieldTerm: String, | ||
unboxingTerm: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unboxingTerm
-> outputTerm
or outputUnboxingTerm
?
val externalResultCasting = if (externalResultClass == externalResultClassBoxed) { | ||
s"($externalResultTypeTerm)" | ||
} else { | ||
s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check the method return class too? If it return primitive class, we don't need add this cast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot determine the return class of the method at this point. The JVM will pick the right method to call with the given signature. The JVM should be smart enough to remove the cast here.
} | ||
// extract null term from result term | ||
if (sourceClass.isPrimitive) { | ||
generateNonNullField(sourceType, internalResultTerm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some concerns that the user has returned primitive conversion class, but the real java method return non-primitive which could be null.
What kind of errors will we give users at this time? Maybe we can add a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users will get a null pointer exception. Which is expected if people override the default type inference and implement advanced features. Usually, this exception should not happen as people will use the extraction + annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really impressive work. The usage of ScalarFunctions looks splendid!
@@ -305,8 +305,12 @@ private DataType extractDataTypeOrError(DataTypeTemplate template, List<Type> ty | |||
DataTypeTemplate template, | |||
List<Type> typeHierarchy, | |||
Type type) { | |||
// byte arrays have higher priority than regular arrays |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about a comment like: prefer VARBINARY/BYTES() over ARRAY(TINYINT) for bytes[]
instead?
} else { | ||
return type; | ||
return FunctionArgumentTemplate.of(type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: two separate methods?
// check for input group before start extracting a data type
return tryExtractInputGroup(method, i)
// extract a concrete data type
.orElseGet(() -> extractUsingExtractor(typeFactory, function, method, i));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I also fixed two other bugs on the way.
/** | ||
* Returns a term for representing the given class in Java code. | ||
*/ | ||
def typeTerm(clazz: Class[_]): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am also in favor of @JingsongLi suggestion. As per the javadoc of getCanonicalName
I think if it gives us null, we cannot use the term anyway.
We could add a check here that throws exception that given class does not have a canonical representation and thus we cannot use it for code generation.
} else { | ||
t.getConversionClass.getCanonicalName | ||
} | ||
t.getConversionClass.getCanonicalName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use typeTerm(t.getConversionClass)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, why is the method called boxed...
? We don't perform any boxing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be legacy. I removed it entirely.
val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t)) | ||
if (isConverterIdentity(t)) { | ||
s"($iTerm) $term" | ||
def genToExternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: For future reference. I feel that we can unify the genToExternal/Internal
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same feeling. But this should be a follow up task. Currently, it is still used at a couple of places.
fieldTerm: String): GeneratedExpression = { | ||
inputType: LogicalType, | ||
inputTerm: String, | ||
inputUnboxingTerm: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this change, but do we really need that parameter? Shouldn't we only ever check for null and assign value from the same input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the old logic we were performing .toInternal()
conversion twice. One time for the null check and one time for the assignment. This improves the runtime code.
@Override | ||
public TypeInference getTypeInference(DataTypeFactory typeFactory) { | ||
return TypeInference.newBuilder() | ||
.outputTypeStrategy(TypeStrategies.argument(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we maybe change the inputTypeStrategy
to be also required instead of a WILDCARD
? I think we should not assume anything for users that end up defining their own type inference strategies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input validation is rather optional. We are mostly interested in the return type which is why this is the only mandatory attribute.
e, | ||
hasMessage( | ||
equalTo( | ||
"Could not find an implementation method that matches the following " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Shall we print the class name of the udf to which the function resolved to and available eval
methods?
I think users would appreciate that.
54efb59
to
1a399c8
Compare
1a399c8
to
007a150
Compare
Thanks for the review @JingsongLi and @dawidwys. I hope I could address most feedback. I will merge this once Travis gives green light. I will open a follow-up issue for more extensive tests for all data types. |
@flinkbot run travis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @twalthr , code generation looks good to me.
What is the purpose of the change
This updates the code generation for the new type inference and thus completes FLINK-15487. Scala function work with the types supported by the planner. Tests added in this PR only test basic behavior. We will need more tests per data type. But this is a follow up issue.
Brief change log
Verifying this change
FunctionITCase
tests the implementation.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation