Skip to content

Commit

Permalink
[FLINK-1350] [runtime] Add blocking result partition variant
Browse files Browse the repository at this point in the history
- Renames runtime intermediate result classes:
  a) Removes "Intermediate" prefix
  b) Queue => Subpartition
  c) Iterator => View

- [FLINK-1350] Adds a spillable result subpartition variant for BLOCKING
  results, which writes data to memory first and starts to spill
  (asynchronously) if not enough memory is available to produce the
  result in-memory only.

  Receiving tasks of BLOCKING results are only deployed after *all*
  partitions have been fully produced. PIPELINED and BLOCKING results can not
  be mixed.

- [FLINK-1359] Adds simple state tracking to result partitions with
  notifications after partitions/subpartitions have been consumed. Each
  partition has to be consumed at least once before it can be released.

  Currently there is no notion of historic intermediate results, i.e. results
  are released as soon as they are consumed.
  • Loading branch information
uce committed Mar 18, 2015
1 parent 1930678 commit 9d7acf3
Show file tree
Hide file tree
Showing 188 changed files with 8,046 additions and 3,245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,13 @@ public int getMaxDepth() {
public InterestingProperties getInterestingProperties() {
return this.intProps;
}


@Override
public long getEstimatedOutputSize() {
return this.estimatedOutputSize;
}

@Override
public long getEstimatedNumRecords() {
return this.estimatedNumRecords;
}
Expand All @@ -478,6 +479,7 @@ public void setEstimatedNumRecords(long estimatedNumRecords) {
this.estimatedNumRecords = estimatedNumRecords;
}

@Override
public float getEstimatedAvgWidthPerOutputRecord() {
if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) {
return ((float) this.estimatedOutputSize) / this.estimatedNumRecords;
Expand Down Expand Up @@ -941,6 +943,7 @@ protected void addClosedBranch(OptimizerNode alreadyClosed) {
if (this.closedBranchingNodes == null) {
this.closedBranchingNodes = new HashSet<OptimizerNode>();
}

this.closedBranchingNodes.add(alreadyClosed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Channel(PlanNode sourceNode, TempMode tempMode) {
*
* @return The source.
*/
@Override
public PlanNode getSource() {
return this.source;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ public void addOutgoingChannel(Channel channel) {
public List<Channel> getOutgoingChannels() {
return this.outChannels;
}



// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,10 +1068,8 @@ private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
default:
throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
}

targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern);

// sourceVertex.conn/ectTo(targetVertex, channelType, distributionPattern);

// -------------- configure the source task's ship strategy strategies in task config --------------
final int outputIndex = sourceConfig.getNumOutputs();
Expand Down Expand Up @@ -1140,6 +1138,7 @@ private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config,
final TempMode tm = channel.getTempMode();

boolean needsMemory = false;
// Don't add a pipeline breaker if the data exchange is already blocking.
if (tm.breaksPipeline()) {
config.setInputAsynchronouslyMaterialized(inputNum, true);
needsMemory = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@

package org.apache.flink.compiler;

import static org.junit.Assert.*;

import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.compiler.testfunctions.SelectOneReducer;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.PlanNode;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plan.SourcePlanNode;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.compiler.testfunctions.SelectOneReducer;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings("serial")
public class PipelineBreakerTest extends CompilerTestBase {
Expand All @@ -42,194 +50,194 @@ public void testPipelineBreakerWithBroadcastVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());

DataSet<Long> result = source.map(new IdentityMapper<Long>())
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");

result.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput().getTempMode().breaksPipeline());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testPipelineBreakerBroadcastedAllReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());

DataSet<Long> bcInput1 = sourceWithMapper
.map(new IdentityMapper<Long>())
.reduce(new SelectOneReducer<Long>());
.map(new IdentityMapper<Long>())
.reduce(new SelectOneReducer<Long>());
DataSet<Long> bcInput2 = env.generateSequence(1, 10);

DataSet<Long> result = sourceWithMapper
.map(new IdentityMapper<Long>())
.withBroadcastSet(bcInput1, "bc1")
.withBroadcastSet(bcInput2, "bc2");
.withBroadcastSet(bcInput1, "bc1")
.withBroadcastSet(bcInput2, "bc2");

result.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput().getTempMode().breaksPipeline());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testPipelineBreakerBroadcastedPartialSolution() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);


DataSet<Long> initialSource = env.generateSequence(1, 10);
IterativeDataSet<Long> iteration = initialSource.iterate(100);


DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());

DataSet<Long> bcInput1 = sourceWithMapper
.map(new IdentityMapper<Long>())
.reduce(new SelectOneReducer<Long>());
.map(new IdentityMapper<Long>())
.reduce(new SelectOneReducer<Long>());

DataSet<Long> result = sourceWithMapper
.map(new IdentityMapper<Long>())
.withBroadcastSet(iteration, "bc2")
.withBroadcastSet(bcInput1, "bc1");
.withBroadcastSet(iteration, "bc2")
.withBroadcastSet(bcInput1, "bc1");


iteration.closeWith(result).print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();

assertTrue(mapper.getInput().getTempMode().breaksPipeline());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testPilelineBreakerWithCross() {
try {
{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();

Configuration conf = new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
}

{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();

Configuration conf = new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
}

{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();

Configuration conf = new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
}

{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(64);

DataSet<Long> initialSource = env.generateSequence(1, 10);
Configuration conf= new Configuration();

Configuration conf = new Configuration();
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();

assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
}
}
Expand Down
Loading

0 comments on commit 9d7acf3

Please sign in to comment.