Skip to content

Commit

Permalink
[FLINK-1005] Make GroupReduce configurable to use either mutable or i…
Browse files Browse the repository at this point in the history
…mmutable object mode
  • Loading branch information
StephanEwen committed Sep 30, 2014
1 parent 1d00cff commit e7c4c85
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.runtime.util.KeyGroupedIteratorImmutable;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

Expand Down Expand Up @@ -91,6 +92,12 @@ public void prepare() throws Exception {
this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
this.comparator = this.taskContext.getDriverComparator(0);
this.input = this.taskContext.getInput(0);

this.mutableObjectMode = config.getMutableObjectMode();

if (LOG.isDebugEnabled()) {
LOG.debug("GroupReduceDriver uses " + (this.mutableObjectMode ? "MUTABLE" : "IMMUTABLE") + " object mode.");
}
}

@Override
Expand All @@ -99,15 +106,23 @@ public void run() throws Exception {
LOG.debug(this.taskContext.formatLogString("GroupReducer preprocessing done. Running GroupReducer code."));
}

final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);

// cache references on the stack
final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();

// run stub implementation
while (this.running && iter.nextKey()) {
stub.reduce(iter.getValues(), output);

if (mutableObjectMode) {
final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
// run stub implementation
while (this.running && iter.nextKey()) {
stub.reduce(iter.getValues(), output);
}
}
else {
final KeyGroupedIteratorImmutable<IT> iter = new KeyGroupedIteratorImmutable<IT>(this.input, this.serializer, this.comparator);
// run stub implementation
while (this.running && iter.nextKey()) {
stub.reduce(iter.getValues(), output);
}
}
}

Expand All @@ -118,4 +133,4 @@ public void cleanup() {}
public void cancel() {
this.running = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class TaskConfig {
private static final String DRIVER_COMPARATOR_PARAMETERS_PREFIX = "driver.comp.params.";

private static final String DRIVER_PAIR_COMPARATOR_FACTORY = "driver.paircomp";

private static final String DRIVER_MUTABLE_OBJECT_MODE = "diver.mutableobjects";

// -------------------------------------- Inputs ----------------------------------------------

Expand Down Expand Up @@ -335,6 +337,14 @@ public DriverStrategy getDriverStrategy() {
}
}

public void setMutableObjectMode(boolean mode) {
this.config.setBoolean(DRIVER_MUTABLE_OBJECT_MODE, mode);
}

public boolean getMutableObjectMode() {
return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, true);
}

public void setDriverComparator(TypeComparatorFactory<?> factory, int inputNum) {
setTypeComparatorFactory(factory, DRIVER_COMPARATOR_FACTORY_PREFIX + inputNum,
DRIVER_COMPARATOR_PARAMETERS_PREFIX + inputNum + SEPARATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.runtime.util;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package eu.stratosphere.pact.runtime.util;
package org.apache.flink.runtime.util;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.util.MutableObjectIterator;

import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TraversableOnceException;
/**
* The KeyValueIterator returns a key and all values that belong to the key (share the same key).
*
Expand Down Expand Up @@ -75,6 +80,7 @@ public boolean nextKey() throws IOException {
this.valuesIterator.next = this.lookahead;
this.lastKeyRecord = this.lookahead;
this.lookahead = null;
this.valuesIterator.iteratorAvailable = true;
return true;
}

Expand All @@ -94,6 +100,7 @@ public boolean nextKey() throws IOException {
this.comparator.setReference(next);
this.valuesIterator.next = next;
this.lastKeyRecord = next;
this.valuesIterator.iteratorAvailable = true;
return true;
}
}
Expand Down Expand Up @@ -170,10 +177,12 @@ public ValuesIterator getValues() {

// --------------------------------------------------------------------------------------------

public final class ValuesIterator implements Iterator<E> {
public final class ValuesIterator implements Iterator<E>, Iterable<E> {

private E next;

private boolean iteratorAvailable = true;

private ValuesIterator(E first) {
this.next = first;
}
Expand All @@ -198,5 +207,16 @@ public E next() {
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public Iterator<E> iterator() {
if (iteratorAvailable) {
iteratorAvailable = false;
return this;
}
else {
throw new TraversableOnceException();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static List<Tuple2<StringValue, IntValue>> createReduceMutableDataGrouped

public static final void compareTupleArrays(Object[] expected, Object[] found) {
if (expected.length != found.length) {
throw new IllegalArgumentException();
Assert.assertEquals("Length of result is wrong", expected.length, found.length);
}

for (int i = 0; i < expected.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.operators.drivers;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.functions.GroupReduceFunction;
Expand All @@ -28,7 +29,6 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.RegularToMutableObjectIterator;
import org.apache.flink.types.IntValue;
Expand All @@ -53,14 +53,18 @@ public void testAllReduceDriverImmutableEmpty() {
TypeComparator<Tuple2<String, Integer>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});
context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);

GatheringCollector<Tuple2<String, Integer>> result = new GatheringCollector<Tuple2<String,Integer>>(typeInfo.createSerializer());

context.setInput1(input, typeInfo.createSerializer());
context.setComparator1(comparator);
context.setCollector(new DiscardingOutputCollector<Tuple2<String, Integer>>());
context.setCollector(result);

GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>> driver = new GroupReduceDriver<Tuple2<String, Integer>, Tuple2<String, Integer>>();
driver.setup(context);
driver.prepare();
driver.run();

Assert.assertTrue(result.getList().isEmpty());
}
catch (Exception e) {
System.err.println(e.getMessage());
Expand Down Expand Up @@ -141,7 +145,84 @@ public void testAllReduceDriverMutable() {
}
}


@Test
public void testAllReduceDriverIncorrectlyAccumulatingMutable() {
try {
TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();

List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});

GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());

context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
context.setInput1(input, typeInfo.createSerializer());
context.setComparator1(comparator);
context.setCollector(result);
context.setUdf(new ConcatSumMutableAccumulatingReducer());

GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
driver.setup(context);
driver.prepare();
driver.run();

Object[] res = result.getList().toArray();
Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();

try {
DriverTestData.compareTupleArrays(expected, res);
Assert.fail("Accumulationg mutable objects is expected to result in incorrect values.");
}
catch (AssertionError e) {
// expected
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

@Test
public void testAllReduceDriverAccumulatingImmutable() {
try {
TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();

List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
MutableObjectIterator<Tuple2<StringValue, IntValue>> input = new RegularToMutableObjectIterator<Tuple2<StringValue, IntValue>>(data.iterator(), typeInfo.createSerializer());
TypeComparator<Tuple2<StringValue, IntValue>> comparator = typeInfo.createComparator(new int[]{0}, new boolean[] {true});

GatheringCollector<Tuple2<StringValue, IntValue>> result = new GatheringCollector<Tuple2<StringValue, IntValue>>(typeInfo.createSerializer());

context.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
context.setInput1(input, typeInfo.createSerializer());
context.setComparator1(comparator);
context.setCollector(result);
context.setUdf(new ConcatSumMutableAccumulatingReducer());
context.setMutableObjectMode(false);

GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> driver = new GroupReduceDriver<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>();
driver.setup(context);
driver.prepare();
driver.run();

Object[] res = result.getList().toArray();
Object[] expected = DriverTestData.createReduceMutableDataGroupedResult().toArray();

DriverTestData.compareTupleArrays(expected, res);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

// --------------------------------------------------------------------------------------------
// Test UDFs
Expand Down Expand Up @@ -178,4 +259,26 @@ public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tup
out.collect(current);
}
}

public static final class ConcatSumMutableAccumulatingReducer implements GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {

@Override
public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) throws Exception {
List<Tuple2<StringValue, IntValue>> all = new ArrayList<Tuple2<StringValue,IntValue>>();

for (Tuple2<StringValue, IntValue> t : values) {
all.add(t);
}

Tuple2<StringValue, IntValue> result = all.get(0);

for (int i = 1; i < all.size(); i++) {
Tuple2<StringValue, IntValue> e = all.get(i);
result.f0.append(e.f0);
result.f1.setValue(result.f1.getValue() + e.f1.getValue());
}

out.collect(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public void setDriverStrategy(DriverStrategy strategy) {
this.config.setDriverStrategy(strategy);
}

public void setMutableObjectMode(boolean mutableObjectMode) {
this.config.setMutableObjectMode(mutableObjectMode);
}

// --------------------------------------------------------------------------------------------
// Context Methods
// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit e7c4c85

Please sign in to comment.