-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-701] Common API based on SAM interfaces rather than rich functions #85
Conversation
I see two options regarding the Java8 tests: b) We integrate the java8 tests into the regular "flink-tests" module, into a separate package and do some maven includes / excludes tricks with build profiles. I vote for option a) it is cleaner and easier to do. |
I guess that (a) is also the only variant to make it work in Eclipse, since m2e does not seem to evaluate these profiles. Eclipse users with java < 8 need to close that subproject then. Can we make a subproject of a flink-tests (and still have code in flink-tests) ? |
@@ -525,7 +526,9 @@ protected void run() throws Exception { | |||
// modify accumulators.ll; | |||
if (this.stub != null) { | |||
// collect the counters from the stub | |||
Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators(); | |||
|
|||
// !!! Is this.runtimeUdfContext the right thing to return here if this.stub.getRuntimeContext() is null? !!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so, but it might be redundant.
If I am not mistaken, the control flow of RegularPactTask
results in this.stub.getRuntimeContext() == this.runtimeUdfContext
. So if one is null, then the other will be null as well and vice versa.
In the initialize
method, we do the following (among other things):
- Create the RuntimeContext (line 434)
- Initialize the stub/function (line 441)
Stub initalization sets the runtime context of the stub in line 699 to the previously created runtime context with FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, both should be the same. And previously, none was ever null.
Great news :-) I think it's very good that we plan to have the breaking changes with the next release already. I made a line comment regarding (3). |
|
||
public class FunctionUtils { | ||
|
||
private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to simplify the detection of the lambda currently by a check function.getClass().getName().indexOf('/') != -1
. This uses the fact that the slach character /
is only used on lambda function names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that this is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just checking for /
should be sufficient (I was unable to find out how IBM's JDK is naming these classes)
We could also check it with this condition:
function.getClass().isSynthetic() && function.getClass().getDeclaredMethods().length == 2
(maybe + && !argF.getClass().isAnonymousClass() && !argF.getClass().isLocalClass()
). I think it is very unlikely that the user is passing a runtime generated class (=synthetic) as a UDF.
http:https://stackoverflow.com/questions/23870478/how-to-correctly-determine-that-an-object-is-a-lambda
http:https://stackoverflow.com/questions/399546/synthetic-class-in-java
In Can you add the same checks for Join, Cross, CoGroup? I will start working on a java-8 only subproject. |
|
||
package org.apache.flink.api.java.functions; | ||
|
||
public class UnsupportedLambdaExpressionException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using the InvalidProgramException
in most places to indicate incorrect and unspoorted behavior. I would suggest to make this UnsupportedLambdaExpressionException a subclass of the InvalidProgramException.
These checks are made in the "with" methods for Join, Cross, CoGroup, which is where the UDF is provided |
Maybe it's just me but I don't like the names of the interfaces. Reducible, for example, to me suggests that the thing can be reduced. But the UDF does the reducing and is not reducible. |
I agree, the names are not perfect. Anyone has a good for names? |
so for mappable:
|
I like Mapper best. A "Map" is something different in most programming languages. 😄 |
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need all dependencies here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for catching this, this is now fixed
I found a good way to detect lambdas. It only works if the SAM interface is serializable, but ours are always. The trick is to search for the
|
Simple type extraction from Lambdas (limited to non-generic parameters): public static Class<?>[] getLambdaParameters(Object lambda) {
for (Class<?> clazz = lambda.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
try {
Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
replaceMethod.setAccessible(true);
Object serializedForm = replaceMethod.invoke(lambda);
if (serializedForm instanceof SerializedLambda) {
SerializedLambda sl = (SerializedLambda) serializedForm;
return getTypesFromSerializedLambda(sl);
}
}
catch (NoSuchMethodException e) {
// fall through the loop and try the next class
}
catch (Throwable t) {
throw new RuntimeException(t);
}
}
throw new IllegalArgumentException("Not a serialized Lambda");
}
public static Class<?>[] getTypesFromSerializedLambda(SerializedLambda sl) throws Exception {
String sig = sl.getImplMethodSignature();
if (!sig.startsWith("(")) {
throw new Exception("Parse Error");
}
String parameters = sig.substring(1, sig.indexOf(')'));
String[] params = parameters.split(";");
List<Class<?>> classes = new ArrayList<>();
for (String p : params) {
if (!p.startsWith("L")) {
throw new Exception("Parse Error");
}
p = p.substring(1);
p = p.replace('/', '.');
classes.add(Class.forName(p));
}
return (Class<?>[]) classes.toArray(new Class<?>[classes.size()]);
} |
Thank you everyone for the terrific feedback. Lambda detection is changed to use SerializedLambda as Stephan suggested (that was grant!). In terms of naming, I do like Mapper, Reducer, FlatMapper, and CoGrouper. The problematic ones are "Filterer", "Joiner", "Crosser", and "FlatJoiner". We can live with those, or call them simply "Filter", "Join", etc, thus having an inconsistent naming scheme (we should probably not use the name "Map" as it would cause frequent conflicts with Java Maps). A previous thought was MapFunctional, ReduceFunctional, etc, referring to functional interfaces. Any thoughts on this? |
I don't find Joiner, Crosser, and FlatJoiner too bad. Filterer sounds strange though... |
I am with you. Mapper, Reducer is fine. And Join, Cross, Filter (without -er) would also be fine. |
Another possibility would be MapUDF and friends... |
…anged examples and tests to use SAMs
…anged examples and tests to use SAMs
This closes apache#85
Because binary image files cannot have a license header, we typically put them into a folder with a "license" file that describes that the images in the folder are licensed to ASF. To follow that best practice, this commit moves the images in docs/images to docs/fig, which contains such a file. This closes apache#85.
This is FLINK-701. This is not good to merge yet, I am putting it out for comments and assistance. However, it should be part of release 0.6 as it breaks compatibility.
The patch changes the Java and Record APIs to work on top of SAM (Single Abstract Method) interfaces rather than abstract classes. The SAMs are named Fooable for an operation Foo (e.g., Mappable for Map, Reducible for Reduce), and they replace the former GenericFooer (e.g., GenericMapper) interfaces.
The original "rich functions" still exist and work as usual, as they implement hte aforementioned interfaces. They are called FooFunction (e.g., MapFunction, ReduceFunction), and contain the open(), close(), etc methods as well.
As part of the refactoring, Cross was changed to return exactly one value rather than taking a collector as input.
GenericCombiner is renamed to FlatCombinable (general naming rule: FlatFooable takes a Collector as parameter and does not return a value, Fooable returns exactly one value if both interfaces
exist). This PR does not add a Combinable and does not solve FLINK-848, these can be added later without breaking compatibility.
This PR does add an explicit FlatJoinable that is at the same level of inheritance as Joiable. The runtime works only on FlatJoinable objects, Joinables are shallowly transformed to FlatJoinables (see
GeneratedFlatJoinFunction).
Two consequences:
(1) FlatCross is removed from the Scala API, as it cannot be supported by the new Cross signature. This is an API design choice that cannot be rectified immediately.
(2) As a side-effect, this PR does add support for Java 8 lambdas in the filter and reduce operators. Lambdas in the other operators do not work yet, as the current TypeExctractor implementation needs to be adapted to extract the return types of lambdas (filter and reduce have both known return types). With that solved, FLINK-612 will probably be resolved as well.
Things that need to be fixed:
(1) Currently, several POM files require Java 8. This will be changed. The goal is to have some lambda tests that are executed only if Java 8 is present.
(2) If a lambda function is currently provided as an argument to a method other than filter or reduce, an error is displayed to the user. Detecting that the input is a lambda is currently done via string matching on the function name (see FunctionUtils.isLambdaFunction) which (a) is quite iffy, and (b) might not work with all JVMSs/break in several occasions.
If someone knows a good way to "detect if something is a lambda", that would be great! The internet was not too much help until now.
(3) Could I have an extra set of eyes on RegularPactTask, line 531?
(4) DeltaIterationTranslationTest currently does not pass, this is simple fix, I will do this asap.