Skip to content

Commit

Permalink
[FLINK-9862] [test] Extend general puropose DataStream test to have a…
Browse files Browse the repository at this point in the history
… tumbling window

This closes apache#6351.
  • Loading branch information
tzulitai committed Jul 24, 2018
1 parent c9a3a8a commit b9a916a
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder;
Expand Down Expand Up @@ -79,6 +82,8 @@
* <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
* <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
* <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
* <li>tumbling_window_operator.num_events (long, default - 20L): The duration of the window, indirectly determined by the target number of events in each window.
* Total duration is (sliding_window_operator.num_events) * (sequence_generator_source.event_time.clock_progress).</li>
* </ul>
*/
class DataStreamAllroundTestJobFactory {
Expand Down Expand Up @@ -184,12 +189,16 @@ class DataStreamAllroundTestJobFactory {

private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
.key("sequence_generator_source.event_time.max_out_of_order")
.defaultValue(500L);
.defaultValue(0L);

private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
.key("sequence_generator_source.event_time.clock_progress")
.defaultValue(100L);

private static final ConfigOption<Long> TUMBLING_WINDOW_OPERATOR_NUM_EVENTS = ConfigOptions
.key("tumbling_window_operator.num_events")
.defaultValue(20L);

static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {

// set checkpointing semantics
Expand Down Expand Up @@ -318,6 +327,23 @@ public long extractTimestamp(Event element) {
};
}

static WindowedStream<Event, Integer, TimeWindow> applyTumblingWindows(
KeyedStream<Event, Integer> keyedStream, ParameterTool pt) {

long eventTimeProgressPerEvent = pt.getLong(
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue());

return keyedStream.timeWindow(
Time.milliseconds(
pt.getLong(
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.key(),
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS.defaultValue()
) * eventTimeProgressPerEvent
)
);
}

static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {

String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
import org.apache.flink.util.Collector;

import java.util.Collections;

import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
Expand Down Expand Up @@ -57,6 +61,7 @@
public class DataStreamAllroundTestProgram {
private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper";
private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME = "TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper";

Expand Down Expand Up @@ -92,14 +97,26 @@ public static void main(String[] args) throws Exception {
.name(OPERATOR_STATE_OPER_NAME)
.returns(Event.class);

// apply a tumbling window that simply passes forward window elements;
// this allows the job to cover timers state
DataStream<Event> eventStream3 = applyTumblingWindows(eventStream2.keyBy(Event::getKey), pt)
.apply(new WindowFunction<Event, Event, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Collector<Event> out) throws Exception {
for (Event e : input) {
out.collect(e);
}
}
}).name(TIME_WINDOW_OPER_NAME);

if (isSimulateFailures(pt)) {
eventStream2 = eventStream2
eventStream3 = eventStream3
.map(createExceptionThrowingFailureMapper(pt))
.setParallelism(1)
.name(FAILURE_MAPPER_NAME);
}

eventStream2.keyBy(Event::getKey)
eventStream3.keyBy(Event::getKey)
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ public void flatMap(Event event, Collector<String> out) throws Exception {

long nextValue = event.getSequenceNumber();

if (validator.check(currentValue, nextValue)) {
sequenceValue.update(nextValue);
} else {
sequenceValue.update(nextValue);
if (!validator.check(currentValue, nextValue)) {
out.collect("Alert: " + currentValue + " -> " + nextValue + " (" + event.getKey() + ")");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,18 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
/** This determines after how many generated events we sleep. A value < 1 deactivates sleeping. */
private final long sleepAfterElements;

/** The current event time progress of this source; will start from 0. */
private long monotonousEventTime;

/** This holds the key ranges for which this source generates events. */
private transient List<KeyRangeStates> keyRanges;

/** This is used to snapshot the state of this source, one entry per key range. */
private transient ListState<KeyRangeStates> snapshotKeyRanges;

/** This is used to snapshot the event time progress of the sources. */
private transient ListState<Long> lastEventTimes;

/** Flag that determines if this source is running, i.e. generating events. */
private volatile boolean running;

Expand Down Expand Up @@ -102,7 +108,6 @@ private void runActive(SourceContext<Event> ctx) throws Exception {
Random random = new Random();

// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
long monotonousEventTime = 0L;
long elementsBeforeSleep = sleepAfterElements;

while (running) {
Expand Down Expand Up @@ -162,7 +167,11 @@ private void runIdle(SourceContext<Event> ctx) throws Exception {
}

private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
if (maxOutOfOrder > 0) {
return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
} else {
return correctTime;
}
}

@Override
Expand All @@ -173,6 +182,9 @@ public void cancel() {
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
snapshotKeyRanges.update(keyRanges);

lastEventTimes.clear();
lastEventTimes.add(monotonousEventTime);
}

@Override
Expand All @@ -182,6 +194,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
final int parallelism = runtimeContext.getNumberOfParallelSubtasks();
final int maxParallelism = runtimeContext.getMaxNumberOfParallelSubtasks();

ListStateDescriptor<Long> unionWatermarksStateDescriptor =
new ListStateDescriptor<>("watermarks", Long.class);

lastEventTimes = context.getOperatorStateStore().getUnionListState(unionWatermarksStateDescriptor);

ListStateDescriptor<KeyRangeStates> stateDescriptor =
new ListStateDescriptor<>("keyRanges", KeyRangeStates.class);

Expand All @@ -193,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get()) {
keyRanges.add(keyRange);
}

// let event time start from the max of all event time progress across subtasks in the last execution
for (Long lastEventTime : lastEventTimes.get()) {
monotonousEventTime = Math.max(monotonousEventTime, lastEventTime);
}
} else {
// determine the key ranges that belong to the subtask
int rangeStartIdx = (subtaskIdx * maxParallelism) / parallelism;
Expand All @@ -207,6 +229,9 @@ public void initializeState(FunctionInitializationContext context) throws Except
keyRanges.add(new KeyRangeStates(start, end));
}
}

// fresh run; start from event time o
monotonousEventTime = 0L;
}
}

Expand Down
9 changes: 6 additions & 3 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_E
run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
run_test "Resuming Savepoint (rocks, no parallelism change, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false heap"
run_test "Resuming Savepoint (rocks, scale up, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false heap"
run_test "Resuming Savepoint (rocks, scale down, heap timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false heap"
run_test "Resuming Savepoint (rocks, no parallelism change, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks false rocks"
run_test "Resuming Savepoint (rocks, scale up, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks false rocks"
run_test "Resuming Savepoint (rocks, scale down, rocks timers) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks false rocks"

run_test "Resuming Externalized Checkpoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file true true"
run_test "Resuming Externalized Checkpoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 2 2 file false true"
Expand Down
6 changes: 6 additions & 0 deletions flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ORIGINAL_DOP=$1
NEW_DOP=$2
STATE_BACKEND_TYPE=${3:-file}
STATE_BACKEND_FILE_ASYNC=${4:-true}
STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-heap}

if (( $ORIGINAL_DOP >= $NEW_DOP )); then
NUM_SLOTS=$ORIGINAL_DOP
Expand All @@ -56,6 +57,11 @@ fi

backup_config
change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"

if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'rocks' ]; then
set_conf "state.backend.rocksdb.timer-service.impl" "rocksdb"
fi

setup_flink_slf4j_metric_reporter

start_cluster
Expand Down

0 comments on commit b9a916a

Please sign in to comment.