Skip to content

Commit

Permalink
[FLINK-1285] Make execution mode configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha authored and StephanEwen committed Jan 7, 2015
1 parent 3832d7b commit b7b32a0
Show file tree
Hide file tree
Showing 82 changed files with 4,168 additions and 752 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.compiler.plantranslate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -27,6 +28,7 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
Expand Down Expand Up @@ -82,6 +84,7 @@
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Visitor;

/**
Expand Down Expand Up @@ -209,7 +212,16 @@ public JobGraph compileJobGraph(OptimizedPlan program) {
for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
}


try {
InstantiationUtil.writeObjectToConfig(
program.getOriginalPactPlan().getExecutionConfig(),
graph.getJobConfiguration(),
ExecutionConfig.CONFIG_KEY);
} catch (IOException e) {
throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
}

// release all references again
this.vertices = null;
this.chainedTasks = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ExecutionConfig implements Serializable {

// For future use...
// private boolean forceGenericSerializer = false;
// private boolean objectReuse = false;
private boolean objectReuse = false;

/**
* Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
Expand Down Expand Up @@ -143,17 +143,30 @@ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries)
// return forceGenericSerializer;
// }
//
// public ExecutionConfig enableObjectReuse() {
// objectReuse = true;
// return this;
// }
//
// public ExecutionConfig disableObjectReuse() {
// objectReuse = false;
// return this;
// }
//
// public boolean isObjectReuseEnabled() {
// return objectReuse;
// }

/**
* Enables reusing objects that Flink internally uses for deserialization and passing
* data to user-code functions. Keep in mind that this can lead to bugs when the
* user-code function of an operation is not aware of this behaviour.
*/
public ExecutionConfig enableObjectReuse() {
objectReuse = true;
return this;
}

/**
* Disables reusing objects that Flink internally uses for deserialization and passing
* data to user-code functions. @see #enableObjectReuse()
*/
public ExecutionConfig disableObjectReuse() {
objectReuse = false;
return this;
}

/**
* Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse()
*/
public boolean isObjectReuseEnabled() {
return objectReuse;
}
}
42 changes: 22 additions & 20 deletions flink-core/src/main/java/org/apache/flink/api/common/Plan.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class Plan implements Visitable<Operator<?>> {
protected int defaultParallelism = DEFAULT_PARALELLISM;

/**
* The number of times failed tasks are re-executed.
* Hash map for files in the distributed cache: registered name to cache entry.
*/
protected int numberOfExecutionRetries;
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>();

/**
* Hash map for files in the distributed cache: registered name to cache entry.
* Config object for runtime execution parameters.
*/
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>();
protected ExecutionConfig executionConfig = new ExecutionConfig();

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -263,20 +263,6 @@ public void setDefaultParallelism(int defaultParallelism) {
this.defaultParallelism = defaultParallelism;
}

/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
}

/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
Expand All @@ -285,7 +271,7 @@ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
return executionConfig.getNumberOfExecutionRetries();
}

/**
Expand All @@ -297,7 +283,23 @@ public int getNumberOfExecutionRetries() {
public String getPostPassClassName() {
return "org.apache.flink.compiler.postpass.RecordModelPostPass";
}


/**
* Sets the runtime config object.
* @return
*/
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}

/**
* Gets the runtime config object.
* @param executionConfig
*/
public void setExecutionConfig(ExecutionConfig executionConfig) {
this.executionConfig = executionConfig;
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

public class CollectionEnvironment extends ExecutionEnvironment {

private boolean mutableObjectSafeMode = true;

@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

CollectionExecutor exec = new CollectionExecutor(mutableObjectSafeMode);

// We need to reverse here. Object-Reuse enabled, means safe mode is disabled.
CollectionExecutor exec = new CollectionExecutor(!getConfig().isObjectReuseEnabled());
return exec.execute(p);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ public JavaPlan createProgramPlan(String jobName, boolean clearSinks) {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
plan.setExecutionConfig(getConfig());

try {
registerCachedFilesWithPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public T createInstance() {

@Override
public T createInstance(Object[] fields) {

try {
T t = tupleClass.newInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.BuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.BuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
Expand All @@ -37,7 +40,10 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
private final int buildSideIndex;

private final int probeSideIndex;


private boolean objectReuseEnabled = false;


protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int probeSideIndex) {
this.buildSideIndex = buildSideIndex;
this.probeSideIndex = probeSideIndex;
Expand Down Expand Up @@ -69,34 +75,67 @@ public void initialize() throws Exception {

double availableMemory = config.getRelativeMemoryDriver();

if (buildSideIndex == 0 && probeSideIndex == 1) {

matchIterator =
new BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory
);

} else if (buildSideIndex == 1 && probeSideIndex == 0) {

matchIterator =
new BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory
);

ExecutionConfig executionConfig = taskContext.getExecutionConfig();
objectReuseEnabled = executionConfig.isObjectReuseEnabled();

if (objectReuseEnabled) {
if (buildSideIndex == 0 && probeSideIndex == 1) {

matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);


} else if (buildSideIndex == 1 && probeSideIndex == 0) {

matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);

} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
}
} else {
throw new Exception("Error: Inconcistent setup for repeatable hash join driver.");
if (buildSideIndex == 0 && probeSideIndex == 1) {

matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);


} else if (buildSideIndex == 1 && probeSideIndex == 0) {

matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);

} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
}
}

this.matchIterator.open();
Expand All @@ -113,21 +152,8 @@ public void run() throws Exception {
final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();

if (buildSideIndex == 0) {

final BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;

while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));

} else if (buildSideIndex == 1) {
while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));

final BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;

while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));

} else {
throw new Exception();
}
}

@Override
Expand All @@ -138,14 +164,25 @@ public void reset() throws Exception {

MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);

if (buildSideIndex == 0 && probeSideIndex == 1) {
final BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input2);
}
else {
final BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input1);

if (objectReuseEnabled) {
if (buildSideIndex == 0 && probeSideIndex == 1) {
final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;

matchIterator.reopenProbe(input2);
} else {
final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input1);
}
} else {
if (buildSideIndex == 0 && probeSideIndex == 1) {
final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;

matchIterator.reopenProbe(input2);
} else {
final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input1);
}
}
}

Expand Down
Loading

0 comments on commit b7b32a0

Please sign in to comment.