Skip to content

Commit

Permalink
[streaming] Improved tests for CoReduceInvokables
Browse files Browse the repository at this point in the history
  • Loading branch information
szape authored and mbalassi committed Oct 1, 2014
1 parent 127470b commit 9e722df
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class CoBatchReduceTest {

private static class MyCoReduceFunction implements CoReduceFunction<Integer, Integer, String> {
private static class MyCoReduceFunction implements CoReduceFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;

@Override
Expand All @@ -39,7 +39,7 @@ public Integer reduce1(Integer value1, Integer value2) {
}

@Override
public Integer reduce2(Integer value1, Integer value2) {
public String reduce2(String value1, String value2) {
return value1 + value2;
}

Expand All @@ -49,38 +49,83 @@ public String map1(Integer value) {
}

@Override
public String map2(Integer value) {
return value.toString();
public String map2(String value) {
return value;
}
}

@Test
public void coBatchReduceTest() {
public void coBatchReduceTest1() {

List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
inputs.add(i);
}

List<String> inputs2 = new ArrayList<String>();
inputs2.add("a");
inputs2.add("b");
inputs2.add("c");
inputs2.add("d");
inputs2.add("e");
inputs2.add("f");
inputs2.add("g");
inputs2.add("h");
inputs2.add("i");

CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>(
new MyCoReduceFunction(), 4L, 3L, 4L, 3L);

List<String> expected = new ArrayList<String>();
expected.add("10");
expected.add("26");
expected.add("19");
expected.add("abc");
expected.add("def");
expected.add("ghi");

List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);

Collections.sort(result);
Collections.sort(expected);

assertEquals(expected, result);

}

@Test
public void coBatchReduceTest2() {

List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
inputs.add(i);
}

List<Integer> inputs2 = new ArrayList<Integer>();
inputs2.add(1);
inputs2.add(2);
inputs2.add(-1);
inputs2.add(-3);
inputs2.add(-4);
List<String> inputs2 = new ArrayList<String>();
inputs2.add("a");
inputs2.add("b");
inputs2.add("c");
inputs2.add("d");
inputs2.add("e");
inputs2.add("f");
inputs2.add("g");
inputs2.add("h");
inputs2.add("i");

CoBatchReduceInvokable<Integer, Integer, String> invokable = new CoBatchReduceInvokable<Integer, Integer, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L);
CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>(
new MyCoReduceFunction(), 4L, 3L, 2L, 2L);

List<String> expected = new ArrayList<String>();
expected.add("6");
expected.add("12");
expected.add("10");
expected.add("18");
expected.add("24");
expected.add("26");
expected.add("34");
expected.add("19");
expected.add("2");
expected.add("-8");
expected.add("-4");
expected.add("abc");
expected.add("cde");
expected.add("efg");
expected.add("ghi");
expected.add("i");

List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class CoGroupedBatchReduceTest {

private static class MyCoReduceFunction implements
CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> {
private static final long serialVersionUID = 1L;

@Override
Expand All @@ -42,9 +42,9 @@ public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1,
}

@Override
public Tuple2<String, Integer> reduce2(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) {
return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
public Tuple2<String, String> reduce2(Tuple2<String, String> value1,
Tuple2<String, String> value2) {
return new Tuple2<String, String>("a", value1.f1 + value2.f1);
}

@Override
Expand All @@ -53,48 +53,96 @@ public String map1(Tuple2<String, Integer> value) {
}

@Override
public String map2(Tuple2<String, Integer> value) {
return value.f1.toString();
public String map2(Tuple2<String, String> value) {
return value.f1;
}
}

@Test
public void coGroupedBatchReduceTest() {
public void coGroupedBatchReduceTest1() {

List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("a", 1));
inputs1.add(new Tuple2<String, Integer>("a", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 5));
inputs1.add(new Tuple2<String, Integer>("a", 3));
inputs1.add(new Tuple2<String, Integer>("a", 4));
inputs1.add(new Tuple2<String, Integer>("a", 5));
inputs1.add(new Tuple2<String, Integer>("b", 6));
inputs1.add(new Tuple2<String, Integer>("a", 7));
inputs1.add(new Tuple2<String, Integer>("b", 8));
inputs1.add(new Tuple2<String, Integer>("b", 9));
inputs1.add(new Tuple2<String, Integer>("b", 10));

List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
inputs2.add(new Tuple2<String, String>("1", "a"));
inputs2.add(new Tuple2<String, String>("2", "b"));
inputs2.add(new Tuple2<String, String>("1", "c"));
inputs2.add(new Tuple2<String, String>("2", "d"));
inputs2.add(new Tuple2<String, String>("1", "e"));
inputs2.add(new Tuple2<String, String>("2", "f"));
inputs2.add(new Tuple2<String, String>("1", "g"));
inputs2.add(new Tuple2<String, String>("2", "h"));
inputs2.add(new Tuple2<String, String>("1", "i"));

List<String> expected = new ArrayList<String>();
expected.add("10");
expected.add("7");
expected.add("9");
expected.add("24");
expected.add("10");
expected.add("10");
expected.add("7");
expected.add("9");
expected.add("24");
expected.add("10");
expected.add("12");
expected.add("33");
expected.add("ace");
expected.add("gi");
expected.add("bdf");
expected.add("h");

CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 0, 0);

List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);

Collections.sort(result);
Collections.sort(expected);
assertEquals(expected, result);
}

@Test
public void coGroupedBatchReduceTest2() {

CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0);
List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("a", 1));
inputs1.add(new Tuple2<String, Integer>("a", 2));
inputs1.add(new Tuple2<String, Integer>("a", 3));
inputs1.add(new Tuple2<String, Integer>("a", 4));
inputs1.add(new Tuple2<String, Integer>("a", 5));
inputs1.add(new Tuple2<String, Integer>("b", 6));
inputs1.add(new Tuple2<String, Integer>("a", 7));
inputs1.add(new Tuple2<String, Integer>("b", 8));
inputs1.add(new Tuple2<String, Integer>("b", 9));
inputs1.add(new Tuple2<String, Integer>("b", 10));

List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>();
inputs2.add(new Tuple2<String, String>("1", "a"));
inputs2.add(new Tuple2<String, String>("2", "b"));
inputs2.add(new Tuple2<String, String>("1", "c"));
inputs2.add(new Tuple2<String, String>("2", "d"));
inputs2.add(new Tuple2<String, String>("1", "e"));
inputs2.add(new Tuple2<String, String>("2", "f"));
inputs2.add(new Tuple2<String, String>("1", "g"));
inputs2.add(new Tuple2<String, String>("2", "h"));
inputs2.add(new Tuple2<String, String>("1", "i"));

List<String> expected = new ArrayList<String>();
expected.add("10");
expected.add("19");
expected.add("12");
expected.add("33");
expected.add("19");
expected.add("ace");
expected.add("egi");
expected.add("i");
expected.add("bdf");
expected.add("fh");

CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 0, 0);

List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);

Expand Down
Loading

0 comments on commit 9e722df

Please sign in to comment.