Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-701] Common API based on SAM interfaces rather than rich functions #85

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Java and record API based on SAM interfaces
  • Loading branch information
ktzoumas committed Jul 29, 2014
commit aed25492c994c200a50ff4972abb274fc0387c4e
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import static org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
import static org.junit.Assert.*;

import org.apache.flink.api.common.functions.GenericJoiner;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.functions.FlatJoinable;
import org.apache.flink.api.common.functions.Mappable;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.Order;
Expand All @@ -47,7 +47,7 @@
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SourcePlanNode;
import org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import org.apache.flink.compiler.testfunctions.DummyJoinFunction;
import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.operators.DriverStrategy;
Expand Down Expand Up @@ -1426,10 +1426,10 @@ private static final DataSourceNode getSourceNode() {
}

private static final MapNode getMapNode() {
return new MapNode(new MapOperatorBase<String, String, MapFunctional<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
return new MapNode(new MapOperatorBase<String, String, Mappable<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
}

private static final MatchNode getJoinNode() {
return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
return new MatchNode(new JoinOperatorBase<String, String, String, FlatJoinable<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.ReduceGroupOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testAllReduceWithCombiner() {

DataSet<Long> data = env.generateSequence(1, 8000000).name("source");

ReduceGroupOperator<Long, Long> reduced = data.reduceGroup(new GroupReduceFunction<Long, Long>() {
GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new GroupReduceFunction<Long, Long>() {
public void reduce(Iterator<Long> values, Collector<Long> out) {}
}).name("reducer");

Expand Down Expand Up @@ -194,7 +194,7 @@ public void testGroupedReduceWithFieldPositionKeyCombinable() {
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:https:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);

ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(1)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:https:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);

ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.flink.compiler.testfunctions;

import org.apache.flink.api.java.functions.JoinFunction;
import org.apache.flink.api.java.functions.FlatJoinFunction;
import org.apache.flink.util.Collector;

public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
public class DummyFlatJoinFunction<T> extends FlatJoinFunction<T, T, T> {

private static final long serialVersionUID = 1L;

@Override
public T join(T first, T second) {
return null;
public void join(T first, T second, Collector<T> out) {
out.collect(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@

import org.apache.flink.api.java.record.functions.CrossFunction;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;

public class DummyCrossStub extends CrossFunction implements Serializable {
public class DummyCrossStub extends CrossFunction {
private static final long serialVersionUID = 1L;


@Override
public void cross(Record record1, Record record2, Collector<Record> out) {
out.collect(record1);
out.collect(record2);
public Record cross(Record first, Record second) throws Exception {
return first;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@

package org.apache.flink.api.common.functions;

import java.io.Serializable;
import java.util.Iterator;

import org.apache.flink.util.Collector;


public interface GenericCoGrouper<V1, V2, O> extends RichFunction {
public interface CoGroupable<V1, V2, O> extends Function, Serializable {

/**
* This method must be implemented to provide a user implementation of a
* coGroup. It is called for each two key-value pairs that share the same
* key and come from different inputs.
*
* @param records1 The records from the first input which were paired with the key.
* @param records2 The records from the second input which were paired with the key.
* @param first The records from the first input which were paired with the key.
* @param second The records from the second input which were paired with the key.
* @param out A collector that collects all output pairs.
*/
void coGroup(Iterator<V1> records1, Iterator<V2> records2, Collector<O> out) throws Exception;
void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.util.Collector;
import java.io.Serializable;


/**
* @param <V1> First input type
* @param <V2> Second input type
* @param <O> Output type
* @param <IN1> First input type
* @param <IN2> Second input type
* @param <OUT> Output type
*/
public interface GenericCrosser<V1, V2, O> extends RichFunction {
public interface Crossable<IN1, IN2, OUT> extends Function, Serializable {

/**
* User defined function for the cross operator.
*
* @param record1 Record from first input
* @param record2 Record from the second input
* @param out Collector to submit resulting records.
* @return result of cross UDF.
* @throws Exception
*/
void cross(V1 record1, V2 record2, Collector<O> out) throws Exception;
OUT cross(IN1 record1, IN2 record2) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Generic interface used for combiners.
*/
public interface Combinable<T> extends Function, Serializable {
public interface FlatCombinable<T> extends Function, Serializable {

void combine(Iterator<T> records, Collector<T> out) throws Exception;
void combine(Iterator<T> values, Collector<T> out) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import org.apache.flink.util.Collector;

import java.io.Serializable;

public interface GenericJoiner<V1, V2, O> extends RichFunction {

void join(V1 value1, V2 value2, Collector<O> out) throws Exception;

public interface FlatJoinable<IN1, IN2, OUT> extends Function, Serializable {

void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@
* @param <O>
*/
public interface FlatMappable<T, O> extends Function, Serializable {

/**
* User defined function to perform transformations on records.
* This method allows to submit an arbitrary number of records
* per incoming tuple.
*
* @param record incoming record
* @param out outgoing collector to return none, one or more records
* @throws Exception
* The core method of FlatMappable. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for for emitting result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T record, Collector<O> out) throws Exception;
void flatMap(T value, Collector<O> out) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ public interface GroupReducible<T, O> extends Function, Serializable {
* @param out The collector to hand results to.
* @throws Exception
*/
void reduce(Iterator<T> records, Collector<O> out) throws Exception;
void reduce(Iterator<T> values, Collector<O> out) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.flink.api.common.functions;

import java.io.Serializable;


public interface Joinable<IN1, IN2, OUT> extends Function, Serializable {

OUT join(IN1 first, IN2 second) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import java.io.Serializable;

public interface Mappable<T, O> extends Function, Serializable {

/**
* A user-implemented function that modifies or transforms an incoming object and
* returns the result.
* The core method of Mappable. Takes an element from the input data set and transforms
* it into exactly one element.
*
* @param value The input value.
* @returns The transformed value
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
O map(T record) throws Exception;
O map(T value) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
import java.io.Serializable;

public interface Reducible<T> extends Function, Serializable {


/**
* The core method of Reducible, combining two values into one value of the same type.
* The reduce function is consecutively applied to all values of a group until only a single value remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

import java.util.regex.Pattern;

public class FunctionUtils {

private static final Pattern lambdaPattern = Pattern.compile("(\\S+)\\$\\$Lambda\\$(\\d+)/\\d+");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to simplify the detection of the lambda currently by a check function.getClass().getName().indexOf('/') != -1. This uses the fact that the slach character / is only used on lambda function names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this is true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just checking for / should be sufficient (I was unable to find out how IBM's JDK is naming these classes)
We could also check it with this condition:
function.getClass().isSynthetic() && function.getClass().getDeclaredMethods().length == 2 (maybe + && !argF.getClass().isAnonymousClass() && !argF.getClass().isLocalClass()). I think it is very unlikely that the user is passing a runtime generated class (=synthetic) as a UDF.

https://stackoverflow.com/questions/23870478/how-to-correctly-determine-that-an-object-is-a-lambda
https://stackoverflow.com/questions/399546/synthetic-class-in-java



public static void openFunction (Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
Expand All @@ -47,7 +52,7 @@ public static void setFunctionRuntimeContext (Function function, RuntimeContext
}
}

public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
return richFunction.getRuntimeContext();
Expand All @@ -56,4 +61,9 @@ public static RuntimeContext getFunctionRuntimeContext (Function function, Runt
return defaultContext;
}
}

public static boolean isLambdaFunction (Function function) {

return lambdaPattern.matcher(function.getClass().getName()).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.util.List;

import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Visitor;

Expand All @@ -33,7 +33,7 @@
* @param <OUT> Output type of the user function
* @param <FT> Type of the user function
*/
public abstract class DualInputOperator<IN1, IN2, OUT, FT extends RichFunction> extends AbstractUdfOperator<OUT, FT> {
public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> extends AbstractUdfOperator<OUT, FT> {

/**
* The operator producing the first input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.common.operators.base;

import org.apache.flink.api.common.functions.GenericCoGrouper;
import org.apache.flink.api.common.functions.CoGroupable;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.Ordering;
Expand All @@ -28,9 +28,9 @@
import org.apache.flink.api.common.operators.util.UserCodeWrapper;

/**
* @see GenericCoGrouper
* @see org.apache.flink.api.common.functions.CoGroupable
*/
public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends GenericCoGrouper<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupable<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {

/**
* The ordering for the order inside a group from input one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.common.operators.base;

import org.apache.flink.api.common.functions.GenericCrosser;
import org.apache.flink.api.common.functions.Crossable;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
Expand All @@ -28,9 +28,9 @@


/**
* @see GenericCrosser
* @see org.apache.flink.api.common.functions.Crossable
*/
public class CrossOperatorBase<IN1, IN2, OUT, FT extends GenericCrosser<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
public class CrossOperatorBase<IN1, IN2, OUT, FT extends Crossable<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> {

public CrossOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
super(udf, operatorInfo, name);
Expand Down
Loading