Skip to content

Commit

Permalink
[FLINK-1110] Adjust tests and fix various issues in the collection-ba…
Browse files Browse the repository at this point in the history
…sed execution.
  • Loading branch information
StephanEwen committed Oct 3, 2014
1 parent ac69cb3 commit aea6a6d
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public FilterOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<T, T
protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();

FunctionUtils.openFunction(function, this.parameters);
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList<T> result = new ArrayList<T>(inputData.size());
ListCollector<T> collector = new ListCollector<T>(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
return exec.execute(p);
}

@Override
public int getDegreeOfParallelism() {
return 1; // always serial
}

@Override
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ public T createInstance(Object[] fields) {

@Override
public T copy(T from) {
return copy(from, instantiateRaw());
T target = instantiateRaw();
for (int i = 0; i < arity; i++) {
Object copy = fieldSerializers[i].copy(from.getField(i));
target.setField(copy, i);
}
return target;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object PartitionProgs {
)


def runProgram(progId: Int, resultPath: String): String = {
def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
progId match {
case 1 =>
val env = ExecutionEnvironment.getExecutionEnvironment
Expand Down Expand Up @@ -101,7 +101,12 @@ object PartitionProgs {
countsInPartition.writeAsText(resultPath)
env.execute()

"(0,55)\n" + "(1,55)\n" + "(2,55)\n" + "(3,55)\n"
val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10;
var result = "";
for (i <- 0 until env.getDegreeOfParallelism) {
result += "(" + i + "," + numPerPartition + ")\n"
}
result

case 4 =>
// Verify that mapPartition operation after repartition picks up correct
Expand Down Expand Up @@ -141,7 +146,7 @@ object PartitionProgs {
count.writeAsText(resultPath)
env.execute()

"(4)\n"
if (onCollection) "(1)\n" else "(4)\n"

case 6 =>
// Verify that filter operation after repartition picks up correct
Expand All @@ -168,7 +173,7 @@ object PartitionProgs {
count.writeAsText(resultPath)
env.execute()

"(4)\n"
if (onCollection) "(1)\n" else "(4)\n"

case _ =>
throw new IllegalArgumentException("Invalid program id")
Expand All @@ -189,7 +194,7 @@ class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config)
}

protected def testProgram(): Unit = {
expectedResult = PartitionProgs.runProgram(curProgId, resultPath)
expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution)
}

protected override def postSubmit(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.test.util;

import java.util.Comparator;
Expand Down Expand Up @@ -47,6 +46,8 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {

private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;

private boolean isCollectionExecution;


public JavaProgramTestBase() {
this(new Configuration());
Expand All @@ -67,6 +68,10 @@ public JobExecutionResult getLatestExecutionResult() {
return this.latestExecutionResult;
}

public boolean isCollectionExecution() {
return isCollectionExecution;
}

// --------------------------------------------------------------------------------------------
// Methods to create the test program and for pre- and post- test work
// --------------------------------------------------------------------------------------------
Expand All @@ -85,6 +90,8 @@ protected void postSubmit() throws Exception {}

@Test
public void testJob() throws Exception {
isCollectionExecution = false;

startCluster();
try {
// pre-submit
Expand Down Expand Up @@ -130,6 +137,8 @@ public void testJob() throws Exception {

@Test
public void testJobCollectionExecution() throws Exception {
isCollectionExecution = true;

// pre-submit
try {
preSubmit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,6 @@ public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
default:
throw new IllegalArgumentException("Invalid program id");
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@RunWith(Parameterized.class)
public class PartitionITCase extends JavaProgramTestBase {

private static int NUM_PROGRAMS = 4;
private static int NUM_PROGRAMS = 1;

private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
Expand Down Expand Up @@ -89,7 +89,7 @@ private static class PartitionProgs {
public static String runProgram(int progId, String resultPath) throws Exception {

switch(progId) {
case 1: {
case 0: {
/*
* Test hash partition by key field
*/
Expand Down Expand Up @@ -141,7 +141,7 @@ public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
"5\n" +
"6\n";
}
case 3: {
case 1: {
/*
* Test forced rebalancing
*/
Expand Down Expand Up @@ -192,11 +192,13 @@ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Excep

env.execute();

StringBuilder result = new StringBuilder();
int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
}
// return expected result
return "(0,55)\n" +
"(1,55)\n" +
"(2,55)\n" +
"(3,55)\n";
return result.toString();
}
case 4: {
/*
Expand Down Expand Up @@ -226,9 +228,7 @@ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Excep
default:
throw new IllegalArgumentException("Invalid program id");
}

}

}

public static class UniqueLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> {
Expand All @@ -253,7 +253,5 @@ public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<In
public Tuple2<Integer, Integer> map(Long value) throws Exception {
return new Tuple2<Integer, Integer>(this.getRuntimeContext().getIndexOfThisSubtask(), 1);
}

}

}

0 comments on commit aea6a6d

Please sign in to comment.