Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-701] Common API based on SAM interfaces rather than rich functions #85

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
map and reduce operators
  • Loading branch information
ktzoumas committed Jul 19, 2014
commit 2eb3aa8a7d301fa5f52c0311ee5cb4fd9920065d
11 changes: 11 additions & 0 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>
</plugins>

<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.junit.Assert.*;

import org.apache.flink.api.common.functions.GenericJoiner;
import org.apache.flink.api.common.functions.GenericMap;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.Order;
Expand Down Expand Up @@ -1426,7 +1426,7 @@ private static final DataSourceNode getSourceNode() {
}

private static final MapNode getMapNode() {
return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
return new MapNode(new MapOperatorBase<String, String, MapFunctional<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
}

private static final MatchNode getJoinNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.flink.api.common.functions;


public interface GenericMap<T, O> extends RichFunction {
import java.io.Serializable;

public interface MapFunctional<T, O> extends Function, Serializable {

/**
* A user-implemented function that modifies or transforms an incoming object and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.flink.api.common.functions;


public interface GenericReduce<T> extends RichFunction {
import java.io.Serializable;

public interface ReduceFunctional<T> extends Function, Serializable {

T reduce(T value1, T value2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;

/**
* Abstract superclass for all contracts that represent actual operators.
*
* @param <FT> Type of the user function
*/
public abstract class AbstractUdfOperator<OUT, FT extends RichFunction> extends Operator<OUT> {
public abstract class AbstractUdfOperator<OUT, FT extends Function> extends Operator<OUT> {

/**
* The object or class containing the user function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.util.List;

import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Visitor;

Expand All @@ -32,7 +32,7 @@
* @param <OUT> Output type of the user function
* @param <FT> Type of the user function
*/
public abstract class SingleInputOperator<IN, OUT, FT extends RichFunction> extends AbstractUdfOperator<OUT, FT> {
public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends AbstractUdfOperator<OUT, FT> {

/**
* The input which produces the data consumed by this operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.common.operators.base;

import org.apache.flink.api.common.functions.GenericMap;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
Expand All @@ -33,7 +33,7 @@
* @param <OUT> The result type.
* @param <FT> The type of the user-defined function.
*/
public class MapOperatorBase<IN, OUT, FT extends GenericMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
public class MapOperatorBase<IN, OUT, FT extends MapFunctional<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {

public MapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
super(udf, operatorInfo, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.flink.api.common.operators.base;

import org.apache.flink.api.common.functions.GenericReduce;
import org.apache.flink.api.common.functions.ReduceFunctional;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
Expand All @@ -31,12 +31,12 @@
* Base data flow operator for Reduce user-defined functions. Accepts reduce functions
* and key positions. The key positions are expected in the flattened common data model.
*
* @see GenericReduce
* @see org.apache.flink.api.common.functions.ReduceFunctional
*
* @param <T> The type (parameters and return type) of the reduce function.
* @param <FT> The type of the reduce function.
*/
public class ReduceOperatorBase<T, FT extends GenericReduce<T>> extends SingleInputOperator<T, T, FT> {
public class ReduceOperatorBase<T, FT extends ReduceFunctional<T>> extends SingleInputOperator<T, T, FT> {

/**
* Creates a grouped reduce data flow operator.
Expand Down
12 changes: 12 additions & 0 deletions flink-examples/flink-java-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ under the License.
</executions>
</plugin>

<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>

</plugins>
</build>

Expand Down
16 changes: 16 additions & 0 deletions flink-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,20 @@ under the License.
<module>flink-scala-examples</module>
</modules>

<build>
<plugins>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>
</plugins>
</build>

</project>
16 changes: 16 additions & 0 deletions flink-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ under the License.

<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.functions.FilterFunctional;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.functions.ReduceFunctional;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.aggregation.Aggregations;
Expand Down Expand Up @@ -135,7 +137,7 @@ public TypeInformation<T> getType() {
* @see MapOperator
* @see DataSet
*/
public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
public <R> MapOperator<T, R> map(MapFunctional<T, R> mapper) {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.");
}
Expand Down Expand Up @@ -276,7 +278,7 @@ public AggregateOperator<T> min (int field) {
* @see ReduceOperator
* @see DataSet
*/
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
public ReduceOperator<T> reduce(ReduceFunctional<T> reducer) {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.api.java.functions;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GenericMap;
import org.apache.flink.api.common.functions.MapFunctional;

/**
* The abstract base class for Map functions. Map functions take elements and transform them,
Expand All @@ -40,7 +40,7 @@
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
public abstract class MapFunction<IN, OUT> extends AbstractRichFunction implements GenericMap<IN, OUT> {
public abstract class MapFunction<IN, OUT> extends AbstractRichFunction implements MapFunctional<IN, OUT> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.api.java.functions;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GenericReduce;
import org.apache.flink.api.common.functions.ReduceFunctional;

/**
* The abstract base class for Reduce functions. Reduce functions combine groups of elements to
Expand All @@ -44,7 +44,7 @@
*
* @param <T> Type of the elements that this function processes.
*/
public abstract class ReduceFunction<T> extends AbstractRichFunction implements GenericReduce<T> {
public abstract class ReduceFunction<T> extends AbstractRichFunction implements ReduceFunctional<T> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GenericCoGrouper;
import org.apache.flink.api.common.functions.GenericMap;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
Expand Down Expand Up @@ -199,10 +199,10 @@ private static <I1, I2, K, OUT> PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> tr
final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());

final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);

cogroup.setFirstInput(keyMapper1);
Expand Down Expand Up @@ -236,10 +236,10 @@ private static <I1, I2, K, OUT> PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> tr
final TupleKeyExtractingMapper<I1, K> extractor1 = new TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]);
final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());

final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");

final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);

Expand Down Expand Up @@ -274,10 +274,10 @@ private static <I1, I2, K, OUT> PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> tr
final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
final TupleKeyExtractingMapper<I2, K> extractor2 = new TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]);

final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
final MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>> keyMapper1 =
new MapOperatorBase<I1, Tuple2<K, I1>, MapFunctional<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
final MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>> keyMapper2 =
new MapOperatorBase<I2, Tuple2<K, I2>, MapFunctional<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");

final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GenericGroupReduce;
import org.apache.flink.api.common.functions.GenericMap;
import org.apache.flink.api.common.functions.MapFunctional;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
Expand Down Expand Up @@ -127,7 +127,7 @@ private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> transl

PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);

MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
MapOperatorBase<IN, Tuple2<K, IN>, MapFunctional<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunctional<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");

reducer.setInput(mapper);
mapper.setInput(input);
Expand Down
Loading