Skip to content

Commit

Permalink
[FLINK-1085] [runtime] Combiner forwards oversized records, rather th…
Browse files Browse the repository at this point in the history
…an failing on them.

This closes apache#854
  • Loading branch information
dabaitu authored and StephanEwen committed Jul 13, 2015
1 parent 7761ddb commit 7271881
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

import java.io.IOException;
import java.util.List;

/**
Expand Down Expand Up @@ -79,6 +78,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin

private Collector<OUT> output;

private long oversizedRecordCount = 0L;

private volatile boolean running = true;

private boolean objectReuseEnabled = false;
Expand Down Expand Up @@ -142,7 +143,7 @@ public void prepare() throws Exception {
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();

if (LOG.isDebugEnabled()) {
LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
LOG.debug("GroupReduceCombineDriver object reuse: {}.", (this.objectReuseEnabled ? "ENABLED" : "DISABLED"));
}
}

Expand Down Expand Up @@ -170,7 +171,10 @@ public void run() throws Exception {

// write the value again
if (!this.sorter.write(value)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
++oversizedRecordCount;
LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
// simply forward the record
this.output.collect((OUT)value);
}
}

Expand All @@ -179,39 +183,28 @@ public void run() throws Exception {
}

private void sortAndCombine() throws Exception {
if (sorter.isEmpty()) {
return;
}

final InMemorySorter<IN> sorter = this.sorter;
this.sortAlgo.sort(sorter);
final GroupCombineFunction<IN, OUT> combiner = this.combiner;
final Collector<OUT> output = this.output;

// iterate over key groups
if (objectReuseEnabled) {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);

final ReusingKeyGroupedIterator<IN> keyIter =
new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);

final GroupCombineFunction<IN, OUT> combiner = this.combiner;
final Collector<OUT> output = this.output;

// iterate over key groups
while (this.running && keyIter.nextKey()) {
combiner.combine(keyIter.getValues(), output);
}
final ReusingKeyGroupedIterator<IN> keyIter =
new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
while (this.running && keyIter.nextKey()) {
combiner.combine(keyIter.getValues(), output);
}
} else {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);

final NonReusingKeyGroupedIterator<IN> keyIter =
new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);

final GroupCombineFunction<IN, OUT> combiner = this.combiner;
final Collector<OUT> output = this.output;

// iterate over key groups
while (this.running && keyIter.nextKey()) {
combiner.combine(keyIter.getValues(), output);
}
final NonReusingKeyGroupedIterator<IN> keyIter =
new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
while (this.running && keyIter.nextKey()) {
combiner.combine(keyIter.getValues(), output);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.operators.testutils.*;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.runtime.operators.testutils.TestData.Generator;
import org.junit.Test;

public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
Expand Down Expand Up @@ -65,7 +63,7 @@ public void testCombineTask() {
addDriverComparator(this.comparator);
addDriverComparator(this.comparator);
setOutput(this.outList);

getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);
Expand All @@ -92,7 +90,39 @@ public void testCombineTask() {

this.outList.clear();
}


@Test
public void testOversizedRecordCombineTask() {
int tenMil = 10000000;
Generator g = new Generator(561349061987311L, 1, tenMil);
//generate 10 records each of size 10MB
final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10);
List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>();
inputs.add(gi);

addInput(new UnionIterator<Record>(inputs));
addDriverComparator(this.comparator);
addDriverComparator(this.comparator);
setOutput(this.outList);

getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);

final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();

try {
testDriver(testTask, MockCombiningReduceStub.class);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Invoke method caused exception.");
}

Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10);

this.outList.clear();
}

@Test
public void testFailingCombineTask() {
int keyCnt = 100;
Expand All @@ -119,7 +149,7 @@ public void testFailingCombineTask() {
Assert.fail("Test failed due to an exception.");
}
}

@Test
public void testCancelCombineTaskSorting()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class UtilsTest {
@Test
public void testUberjarLocator() {
File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
Assert.assertTrue(dir.getName().endsWith(".jar"));
Assert.assertNotNull(dir);
Assert.assertTrue(dir.getName().endsWith(".jar"));
dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
Assert.assertTrue(dir.exists());
Assert.assertTrue(dir.isDirectory());
Expand Down

0 comments on commit 7271881

Please sign in to comment.