Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-701] Common API based on SAM interfaces rather than rich functions #85

Closed
wants to merge 14 commits into from

Conversation

ktzoumas
Copy link
Contributor

This is FLINK-701. This is not good to merge yet, I am putting it out for comments and assistance. However, it should be part of release 0.6 as it breaks compatibility.

The patch changes the Java and Record APIs to work on top of SAM (Single Abstract Method) interfaces rather than abstract classes. The SAMs are named Fooable for an operation Foo (e.g., Mappable for Map, Reducible for Reduce), and they replace the former GenericFooer (e.g., GenericMapper) interfaces.

The original "rich functions" still exist and work as usual, as they implement hte aforementioned interfaces. They are called FooFunction (e.g., MapFunction, ReduceFunction), and contain the open(), close(), etc methods as well.

As part of the refactoring, Cross was changed to return exactly one value rather than taking a collector as input.

GenericCombiner is renamed to FlatCombinable (general naming rule: FlatFooable takes a Collector as parameter and does not return a value, Fooable returns exactly one value if both interfaces
exist). This PR does not add a Combinable and does not solve FLINK-848, these can be added later without breaking compatibility.

This PR does add an explicit FlatJoinable that is at the same level of inheritance as Joiable. The runtime works only on FlatJoinable objects, Joinables are shallowly transformed to FlatJoinables (see
GeneratedFlatJoinFunction).

Two consequences:

(1) FlatCross is removed from the Scala API, as it cannot be supported by the new Cross signature. This is an API design choice that cannot be rectified immediately.

(2) As a side-effect, this PR does add support for Java 8 lambdas in the filter and reduce operators. Lambdas in the other operators do not work yet, as the current TypeExctractor implementation needs to be adapted to extract the return types of lambdas (filter and reduce have both known return types). With that solved, FLINK-612 will probably be resolved as well.

Things that need to be fixed:

(1) Currently, several POM files require Java 8. This will be changed. The goal is to have some lambda tests that are executed only if Java 8 is present.

(2) If a lambda function is currently provided as an argument to a method other than filter or reduce, an error is displayed to the user. Detecting that the input is a lambda is currently done via string matching on the function name (see FunctionUtils.isLambdaFunction) which (a) is quite iffy, and (b) might not work with all JVMSs/break in several occasions.

If someone knows a good way to "detect if something is a lambda", that would be great! The internet was not too much help until now.

(3) Could I have an extra set of eyes on RegularPactTask, line 531?

(4) DeltaIterationTranslationTest currently does not pass, this is simple fix, I will do this asap.

@ktzoumas ktzoumas changed the title Java api functions to sams Common API based on SAMs rather than rich functions Jul 29, 2014
@ktzoumas ktzoumas changed the title Common API based on SAMs rather than rich functions Common API based on SAM interfaces rather than rich functions Jul 29, 2014
@rmetzger
Copy link
Contributor

I see two options regarding the Java8 tests:
a) We create a "flink-java8-tests" maven module that is only included into the build if java8 is present (we can do this via build profiles that activate at certain java versions).

b) We integrate the java8 tests into the regular "flink-tests" module, into a separate package and do some maven includes / excludes tricks with build profiles.

I vote for option a) it is cleaner and easier to do.

@StephanEwen
Copy link
Contributor

I guess that (a) is also the only variant to make it work in Eclipse, since m2e does not seem to evaluate these profiles. Eclipse users with java < 8 need to close that subproject then.

Can we make a subproject of a flink-tests (and still have code in flink-tests) ?

@@ -525,7 +526,9 @@ protected void run() throws Exception {
// modify accumulators.ll;
if (this.stub != null) {
// collect the counters from the stub
Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators();

// !!! Is this.runtimeUdfContext the right thing to return here if this.stub.getRuntimeContext() is null? !!!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so, but it might be redundant.

If I am not mistaken, the control flow of RegularPactTask results in this.stub.getRuntimeContext() == this.runtimeUdfContext. So if one is null, then the other will be null as well and vice versa.

In the initialize method, we do the following (among other things):

  1. Create the RuntimeContext (line 434)
  2. Initialize the stub/function (line 441)

Stub initalization sets the runtime context of the stub in line 699 to the previously created runtime context with FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, both should be the same. And previously, none was ever null.

@uce
Copy link
Contributor

uce commented Jul 29, 2014

Great news :-)

I think it's very good that we plan to have the breaking changes with the next release already.

I made a line comment regarding (3).


public class FunctionUtils {

private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to simplify the detection of the lambda currently by a check function.getClass().getName().indexOf('/') != -1. This uses the fact that the slach character / is only used on lambda function names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this is true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just checking for / should be sufficient (I was unable to find out how IBM's JDK is naming these classes)
We could also check it with this condition:
function.getClass().isSynthetic() && function.getClass().getDeclaredMethods().length == 2 (maybe + && !argF.getClass().isAnonymousClass() && !argF.getClass().isLocalClass()). I think it is very unlikely that the user is passing a runtime generated class (=synthetic) as a UDF.

http:https://stackoverflow.com/questions/23870478/how-to-correctly-determine-that-an-object-is-a-lambda
http:https://stackoverflow.com/questions/399546/synthetic-class-in-java

@StephanEwen
Copy link
Contributor

In DataSet you have checks for Map, FlatMap, GroupReduce, to check that no currently unsupported Lambdas are used.

Can you add the same checks for Join, Cross, CoGroup?

I will start working on a java-8 only subproject.


package org.apache.flink.api.java.functions;

public class UnsupportedLambdaExpressionException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the InvalidProgramException in most places to indicate incorrect and unspoorted behavior. I would suggest to make this UnsupportedLambdaExpressionException a subclass of the InvalidProgramException.

@ktzoumas
Copy link
Contributor Author

These checks are made in the "with" methods for Join, Cross, CoGroup, which is where the UDF is provided

@aljoscha
Copy link
Contributor

Maybe it's just me but I don't like the names of the interfaces. Reducible, for example, to me suggests that the thing can be reduced. But the UDF does the reducing and is not reducible.

@StephanEwen
Copy link
Contributor

I agree, the names are not perfect. Anyone has a good for names?

@zentol
Copy link
Contributor

zentol commented Jul 30, 2014

so for mappable:

  • why not go with the previous name? GenericMapper
  • or drop the "generic" part? Mapper
  • or go even shorter? Map
  • maybe append "SAM" ? MapSAM/MapperSAM

@aljoscha
Copy link
Contributor

I like Mapper best. A "Map" is something different in most programming languages. 😄

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need all dependencies here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for catching this, this is now fixed

@StephanEwen
Copy link
Contributor

I found a good way to detect lambdas. It only works if the SAM interface is serializable, but ours are always.

The trick is to search for the writeReplace method from serializable objects and see whether it returns a SerializedLambda.

public interface LambdaFunction extends java.io.Serializable {
    String doComputation(Integer value);
}

public static void main(String[] args) throws Exception {
    LambdaFunction func = (theInteger) -> "string " + String.valueOf(theInteger);

    for (Class<?> clazz = func.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
        try {
            Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
            replaceMethod.setAccessible(true);
            Object serializedForm = replaceMethod.invoke(func);

            if (serializedForm instanceof SerializedLambda) {
                SerializedLambda sl = (SerializedLambda) serializedForm;
                System.out.println(sl);
                break;
            }
        }
        catch (NoSuchMethodError e) {
            // fall through the loop and try the next class
        }
        catch (Throwable t) {
            t.printStackTrace();
            return;
        }
    }
}

@StephanEwen
Copy link
Contributor

Simple type extraction from Lambdas (limited to non-generic parameters):

    public static Class<?>[] getLambdaParameters(Object lambda) {
        for (Class<?> clazz = lambda.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
            try {
                Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
                replaceMethod.setAccessible(true);
                Object serializedForm = replaceMethod.invoke(lambda);

                if (serializedForm instanceof SerializedLambda) {
                    SerializedLambda sl = (SerializedLambda) serializedForm;
                    return getTypesFromSerializedLambda(sl);
                }
            }
            catch (NoSuchMethodException e) {
                // fall through the loop and try the next class
            }
            catch (Throwable t) {
                throw new RuntimeException(t);
            }
        }

        throw new IllegalArgumentException("Not a serialized Lambda");
    }

    public static Class<?>[] getTypesFromSerializedLambda(SerializedLambda sl) throws Exception {
        String sig = sl.getImplMethodSignature();

        if (!sig.startsWith("(")) {
            throw new Exception("Parse Error");
        }

        String parameters = sig.substring(1, sig.indexOf(')'));
        String[] params = parameters.split(";");

        List<Class<?>> classes = new ArrayList<>();

        for (String p : params) {
            if (!p.startsWith("L")) {
                throw new Exception("Parse Error");
            }

            p = p.substring(1);
            p = p.replace('/', '.');
            classes.add(Class.forName(p));
        }


        return (Class<?>[]) classes.toArray(new Class<?>[classes.size()]);
    }

@ktzoumas
Copy link
Contributor Author

Thank you everyone for the terrific feedback. Lambda detection is changed to use SerializedLambda as Stephan suggested (that was grant!).

In terms of naming, I do like Mapper, Reducer, FlatMapper, and CoGrouper. The problematic ones are "Filterer", "Joiner", "Crosser", and "FlatJoiner". We can live with those, or call them simply "Filter", "Join", etc, thus having an inconsistent naming scheme (we should probably not use the name "Map" as it would cause frequent conflicts with Java Maps).

A previous thought was MapFunctional, ReduceFunctional, etc, referring to functional interfaces. Any thoughts on this?

@fhueske
Copy link
Contributor

fhueske commented Jul 31, 2014

I don't find Joiner, Crosser, and FlatJoiner too bad. Filterer sounds strange though...
FooFunctional would also be fine with me.

@StephanEwen
Copy link
Contributor

I am with you. Mapper, Reducer is fine. And Join, Cross, Filter (without -er) would also be fine.

@aljoscha
Copy link
Contributor

Another possibility would be MapUDF and friends...

@ktzoumas ktzoumas changed the title Common API based on SAM interfaces rather than rich functions [FLINK-701] Common API based on SAM interfaces rather than rich functions Jul 31, 2014
@asfgit asfgit closed this in 934e4e0 Aug 1, 2014
zhijiangW pushed a commit to zhijiangW/flink that referenced this pull request Jul 23, 2019
tzulitai pushed a commit to tzulitai/flink that referenced this pull request Jan 15, 2021
Because binary image files cannot have a license header, we typically put them into a
folder with a "license" file that describes that the images in the folder are licensed
to ASF.

To follow that best practice, this commit moves the images in docs/images to docs/fig,
which contains such a file.

This closes apache#85.
mas-chen pushed a commit to mas-chen/flink that referenced this pull request Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants