Skip to content

Commit

Permalink
Added parameter checks and tests for first-n operator.
Browse files Browse the repository at this point in the history
Excluded DataSet.first(), UnsortedGrouping.first(), and SortedGrouping.first() methods from ScalaAPICompletenessTest.
  • Loading branch information
fhueske committed Sep 24, 2014
1 parent 6702a2e commit 141946a
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ public ReduceOperator<T> maxBy(int... fields) {
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if(n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}

return reduceGroup(new FirstReducer<T>(n));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc

/**
* Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
* @param n The desired number of elements.
* @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if(n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}

return reduceGroup(new FirstReducer<T>(n));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,14 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc

/**
* Returns a new set containing the first n elements in this grouped {@link DataSet}.<br/>
* @param n The desired number of elements.
* @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if(n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}

return reduceGroup(new FirstReducer<T>(n));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.operator;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.junit.Assert;
import org.junit.Test;

public class FirstNOperatorTest {

// TUPLE DATA

private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();

private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);

@Test
public void testUngroupedFirstN() {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

// should work
try {
tupleDs.first(1);
} catch(Exception e) {
Assert.fail();
}

// should work
try {
tupleDs.first(10);
} catch(Exception e) {
Assert.fail();
}

// should not work n == 0
try {
tupleDs.first(0);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}

// should not work n == -1
try {
tupleDs.first(-1);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}

}

@Test
public void testGroupedFirstN() {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

// should work
try {
tupleDs.groupBy(2).first(1);
} catch(Exception e) {
Assert.fail();
}

// should work
try {
tupleDs.groupBy(1,3).first(10);
} catch(Exception e) {
Assert.fail();
}

// should not work n == 0
try {
tupleDs.groupBy(0).first(0);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}

// should not work n == -1
try {
tupleDs.groupBy(2).first(-1);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}
}

@Test
public void testGroupedSortedFirstN() {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

// should work
try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
} catch(Exception e) {
Assert.fail();
}

// should work
try {
tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
} catch(Exception e) {
Assert.fail();
}

// should not work n == 0
try {
tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}

// should not work n == -1
try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
Assert.fail();
} catch(InvalidProgramException ipe) {
// we're good here
} catch(Exception e) {
Assert.fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class ScalaAPICompletenessTest {
"org.apache.flink.api.java.operators.UnsortedGrouping.minBy",
"org.apache.flink.api.java.operators.UnsortedGrouping.maxBy",

// Exclude first operator for now
"org.apache.flink.api.java.DataSet.first",
"org.apache.flink.api.java.operators.SortedGrouping.first",
"org.apache.flink.api.java.operators.UnsortedGrouping.first",

// Exclude explicit rebalance and hashPartitionBy for now
"org.apache.flink.api.java.DataSet.partitionByHash",
"org.apache.flink.api.java.DataSet.rebalance"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.javaApiOperators;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class FirstNITCase extends JavaProgramTestBase {

private static int NUM_PROGRAMS = 3;

private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String expectedResult;

public FirstNITCase(Configuration config) {
super(config);
}

@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}

@Override
protected void testProgram() throws Exception {
expectedResult = FirstNProgs.runProgram(curProgId, resultPath);
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}

@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {

LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();

for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}

return toParameterList(tConfigs);
}

private static class FirstNProgs {

public static String runProgram(int progId, String resultPath) throws Exception {

switch(progId) {
case 1: {
/*
* First-n on ungrouped data set
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);

seven.writeAsText(resultPath);
env.execute();

// return expected result
return "(7)\n";
}
case 2: {
/*
* First-n on grouped data set
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
.map(new OneMapper2()).groupBy(0).sum(1);

first.writeAsText(resultPath);
env.execute();

// return expected result
return "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
}
case 3: {
/*
* First-n on grouped and sorted data set
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
.project(1,0).types(Long.class, Integer.class);

first.writeAsText(resultPath);
env.execute();

// return expected result
return "(1,1)\n"
+ "(2,3)\n(2,2)\n"
+ "(3,6)\n(3,5)\n(3,4)\n"
+ "(4,10)\n(4,9)\n(4,8)\n"
+ "(5,15)\n(5,14)\n(5,13)\n"
+ "(6,21)\n(6,20)\n(6,19)\n";

}
default:
throw new IllegalArgumentException("Invalid program id");
}

}

}

public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
private final Tuple1<Integer> one = new Tuple1<Integer>(1);
@Override
public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
return one;
}
}

public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
private static final long serialVersionUID = 1L;
private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
@Override
public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
one.f0 = value.f1;
return one;
}
}

}

0 comments on commit 141946a

Please sign in to comment.