Skip to content

Commit

Permalink
[FLINK-8974] Run all-round DataSet job with failures in HA mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys authored and zentol committed Jul 30, 2018
1 parent a278d59 commit bf6db7d
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -34,11 +31,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;

/**
* Program to test a large chunk of DataSet API operators and primitives:
Expand All @@ -55,6 +48,8 @@
* <ul>
* <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
* <li>outputPath (String): path to write the result</li>
* <li>infinite (Boolean): if set to true one of the sources will be infinite. The job will never end.
* (default: false(</li>
* </ul>
*/
public class DataSetAllroundTestProgram {
Expand All @@ -66,13 +61,20 @@ public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
boolean infinite = params.getBoolean("infinite", false);

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

int numKeys = loadFactor * 128 * 1024;
DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x1Keys;
DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(Generator.generate(numKeys * 32, 2)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(Generator.generate(numKeys, 8)).setParallelism(4);

if (infinite) {
x1Keys = env.createInput(Generator.generateInfinitely(numKeys)).setParallelism(4);
} else {
x1Keys = env.createInput(Generator.generate(numKeys, 1)).setParallelism(4);
}

DataSet<Tuple2<String, Integer>> joined = x2Keys
// shift keys (check for correct handling of key positions)
Expand Down Expand Up @@ -179,105 +181,4 @@ public String getKey(Tuple2<String, Integer> value) {
env.execute();
}

/**
* InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
* <ul>
* <li>String: key, can be repeated.</li>
* <li>Integer: uniformly distributed int between 0 and 127</li>
* </ul>
*/
public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {

// total number of records
private final long numRecords;
// total number of keys
private final long numKeys;

// records emitted per partition
private long recordsPerPartition;
// number of keys per partition
private long keysPerPartition;

// number of currently emitted records
private long recordCnt;

// id of current partition
private int partitionId;
// total number of partitions
private int numPartitions;

public Generator(long numKeys, int recordsPerKey) {
this.numKeys = numKeys;
this.numRecords = numKeys * recordsPerKey;
}

@Override
public void configure(Configuration parameters) { }

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
return null;
}

@Override
public GenericInputSplit[] createInputSplits(int minNumSplits) {

GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
for (int i = 0; i < minNumSplits; i++) {
splits[i] = new GenericInputSplit(i, minNumSplits);
}
return splits;
}

@Override
public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}

@Override
public void open(GenericInputSplit split) {
this.partitionId = split.getSplitNumber();
this.numPartitions = split.getTotalNumberOfSplits();

// ensure even distribution of records and keys
Preconditions.checkArgument(
numRecords % numPartitions == 0,
"Records cannot be evenly distributed among partitions");
Preconditions.checkArgument(
numKeys % numPartitions == 0,
"Keys cannot be evenly distributed among partitions");

this.recordsPerPartition = numRecords / numPartitions;
this.keysPerPartition = numKeys / numPartitions;

this.recordCnt = 0;
}

@Override
public boolean reachedEnd() {
return this.recordCnt >= this.recordsPerPartition;
}

@Override
public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) {

// build key from partition id and count per partition
String key = String.format(
"%d-%d",
this.partitionId,
this.recordCnt % this.keysPerPartition);
// 128 values to filter on
int filterVal = (int) this.recordCnt % 128;

this.recordCnt++;

reuse.f0 = key;
reuse.f1 = filterVal;
return reuse;
}

@Override
public void close() { }
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.batch.tests;

import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
* <ul>
* <li>String: key, can be repeated.</li>
* <li>Integer: uniformly distributed int between 0 and 127</li>
* </ul>
*/
public class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {

// total number of records
private final long numRecords;
// total number of keys
private final long numKeys;

// records emitted per partition
private long recordsPerPartition;
// number of keys per partition
private long keysPerPartition;

// number of currently emitted records
private long recordCnt;

// id of current partition
private int partitionId;

private final boolean infinite;

public static Generator generate(long numKeys, int recordsPerKey) {
return new Generator(numKeys, recordsPerKey, false);
}

public static Generator generateInfinitely(long numKeys) {
return new Generator(numKeys, 0, true);
}

private Generator(long numKeys, int recordsPerKey, boolean infinite) {
this.numKeys = numKeys;
if (infinite) {
this.numRecords = Long.MAX_VALUE;
} else {
this.numRecords = numKeys * recordsPerKey;
}
this.infinite = infinite;
}

@Override
public void configure(Configuration parameters) {
}

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
return null;
}

@Override
public GenericInputSplit[] createInputSplits(int minNumSplits) {

GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
for (int i = 0; i < minNumSplits; i++) {
splits[i] = new GenericInputSplit(i, minNumSplits);
}
return splits;
}

@Override
public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}

@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionId = split.getSplitNumber();
// total number of partitions
int numPartitions = split.getTotalNumberOfSplits();

// ensure even distribution of records and keys
Preconditions.checkArgument(
numRecords % numPartitions == 0,
"Records cannot be evenly distributed among partitions");
Preconditions.checkArgument(
numKeys % numPartitions == 0,
"Keys cannot be evenly distributed among partitions");

this.recordsPerPartition = numRecords / numPartitions;
this.keysPerPartition = numKeys / numPartitions;

this.recordCnt = 0;
}

@Override
public boolean reachedEnd() {
return !infinite && this.recordCnt >= this.recordsPerPartition;
}

@Override
public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException {

// build key from partition id and count per partition
String key = String.format(
"%d-%d",
this.partitionId,
this.recordCnt % this.keysPerPartition);

// 128 values to filter on
int filterVal = (int) this.recordCnt % 128;

this.recordCnt++;

reuse.f0 = key;
reuse.f1 = filterVal;
return reuse;
}

@Override
public void close() {
}
}
10 changes: 6 additions & 4 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ run_test "ConnectedComponents iterations with high parallelism end-to-end test"
run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"

run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
run_test "Running HA dataset end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_dataset.sh"

run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file true false"
run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh file false false"
run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false"
run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true"

run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
Expand Down
Loading

0 comments on commit bf6db7d

Please sign in to comment.