Skip to content

Commit

Permalink
[FLINK-8997] Added sliding window aggregation to datastream test job
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Nov 19, 2018
1 parent 868e411 commit 22e35bf
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.tests.artificialstate.ArtificalOperatorStateMapper;
Expand Down Expand Up @@ -86,6 +88,10 @@
* <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
* <li>tumbling_window_operator.num_events (long, default - 20L): The duration of the window, indirectly determined by the target number of events in each window.
* Total duration is (sliding_window_operator.num_events) * (sequence_generator_source.event_time.clock_progress).</li>
* <li>test_slide_factor (int, default - 3): test_slide_factor (int, default - 3): how many slides there are in a
* single window (in other words at most how many windows may be opened at time for a given key) The length of
* a window will be calculated as (test_slide_size) * (test_slide_factor)</li>
* <li>test_slide_size (long, default - 250): length of a slide of sliding window in milliseconds. The length of a window will be calculated as (test_slide_size) * (test_slide_factor)</li>
* </ul>
*/
public class DataStreamAllroundTestJobFactory {
Expand Down Expand Up @@ -201,6 +207,14 @@ public class DataStreamAllroundTestJobFactory {
.key("tumbling_window_operator.num_events")
.defaultValue(20L);

private static final ConfigOption<Integer> TEST_SLIDE_FACTOR = ConfigOptions
.key("test_slide_factor")
.defaultValue(3);

private static final ConfigOption<Long> TEST_SLIDE_SIZE = ConfigOptions
.key("test_slide_size")
.defaultValue(250L);

public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {

// set checkpointing semantics
Expand Down Expand Up @@ -455,4 +469,24 @@ static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
listStateGenerator,
listStateDescriptor);
}

static SlidingEventTimeWindows createSlidingWindow(ParameterTool pt) {
long slideSize = pt.getLong(
TEST_SLIDE_SIZE.key(),
TEST_SLIDE_SIZE.defaultValue());

long slideFactor = pt.getInt(
TEST_SLIDE_FACTOR.key(),
TEST_SLIDE_FACTOR.defaultValue()
);

return SlidingEventTimeWindows.of(Time.milliseconds(slideSize * slideFactor), Time.milliseconds(slideSize));
}

static FlatMapFunction<Tuple2<Integer, List<Event>>, String> createSlidingWindowCheckMapper(ParameterTool pt) {
return new SlidingWindowCheckMapper(pt.getInt(
TEST_SLIDE_FACTOR.key(),
TEST_SLIDE_FACTOR.defaultValue()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.formats.avro.typeutils.AvroSerializer;
Expand All @@ -36,13 +37,18 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindow;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindowCheckMapper;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
Expand All @@ -69,6 +75,8 @@ public class DataStreamAllroundTestProgram {
private static final String TIME_WINDOW_OPER_NAME = "TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper";
private static final String FAILURE_MAPPER_NAME = "FailureMapper";
private static final String SLIDING_WINDOW_CHECK_MAPPER_NAME = "SlidingWindowCheckMapper";
private static final String SLIDING_WINDOW_AGG_NAME = "SlidingWindowOperator";

public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
Expand Down Expand Up @@ -153,8 +161,34 @@ public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Col
eventStream3.keyBy(Event::getKey)
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("0007")
.addSink(new PrintSinkFunction<>()).uid("0008");
.uid("007")
.addSink(new PrintSinkFunction<>())
.uid("008");

// Check sliding windows aggregations. Output all elements assigned to a window and later on
// check if each event was emitted slide_factor number of times
DataStream<Tuple2<Integer, List<Event>>> eventStream4 = eventStream2.keyBy(Event::getKey)
.window(createSlidingWindow(pt))
.apply(new WindowFunction<Event, Tuple2<Integer, List<Event>>, Integer, TimeWindow>() {
private static final long serialVersionUID = 3166250579972849440L;

@Override
public void apply(
Integer key, TimeWindow window, Iterable<Event> input,
Collector<Tuple2<Integer, List<Event>>> out) throws Exception {

out.collect(Tuple2.of(key, StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
}
})
.name(SLIDING_WINDOW_AGG_NAME)
.uid("009");

eventStream4.keyBy(events-> events.f0)
.flatMap(createSlidingWindowCheckMapper(pt))
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
.addSink(new PrintSinkFunction<>())
.uid("011");

env.execute("General purpose test job");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.tests;

import java.util.Objects;

/**
* The event type of records used in the {@link DataStreamAllroundTestProgram}.
*/
Expand Down Expand Up @@ -51,6 +53,26 @@ public String getPayload() {
return payload;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Event event = (Event) o;
return key == event.key &&
eventTime == event.eventTime &&
sequenceNumber == event.sequenceNumber &&
Objects.equals(payload, event.payload);
}

@Override
public int hashCode() {
return Objects.hash(key, eventTime, sequenceNumber, payload);
}

@Override
public String toString() {
return "Event{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> {
private static final long serialVersionUID = -744070793650644485L;

/** This value state tracks the current sequence number per key. */
private volatile ValueState<Long> sequenceValue;
private transient ValueState<Long> sequenceValue;

/** This defines how semantics are checked for each update. */
private final ValidatorFunction validator;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

/**
* This mapper validates sliding event time window. It checks each event belongs to appropriate number of consecutive
* windows.
*/
public class SlidingWindowCheckMapper extends RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {

private static final long serialVersionUID = -744070793650644485L;

/** This value state tracks previously seen events with the number of windows they appeared in. */
private transient ValueState<List<Tuple2<Event, Integer>>> eventsSeenSoFar;

private transient ValueState<Long> lastSequenceNumber;

private final int slideFactor;

SlidingWindowCheckMapper(int slideFactor) {
this.slideFactor = slideFactor;
}

@Override
public void open(Configuration parameters) {
ValueStateDescriptor<List<Tuple2<Event, Integer>>> previousWindowDescriptor =
new ValueStateDescriptor<>("eventsSeenSoFar",
new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));

eventsSeenSoFar = getRuntimeContext().getState(previousWindowDescriptor);

ValueStateDescriptor<Long> lastSequenceNumberDescriptor =
new ValueStateDescriptor<>("lastSequenceNumber", BasicTypeInfo.LONG_TYPE_INFO);

lastSequenceNumber = getRuntimeContext().getState(lastSequenceNumberDescriptor);
}

@Override
public void flatMap(Tuple2<Integer, List<Event>> value, Collector<String> out) throws Exception {
List<Tuple2<Event, Integer>> previousWindowValues = Optional.ofNullable(eventsSeenSoFar.value()).orElseGet(
Collections::emptyList);

List<Event> newValues = value.f1;
Optional<Event> lastEventInWindow = verifyWindowContiguity(newValues, out);

Long lastSequenceNumberSeenSoFar = lastSequenceNumber.value();
List<Tuple2<Event, Integer>> newWindows =
verifyPreviousOccurences(previousWindowValues, newValues, lastSequenceNumberSeenSoFar, out);

if (lastEventInWindow.isPresent()) {
updateLastSeenSequenceNumber(lastEventInWindow.get(), lastSequenceNumberSeenSoFar, out);
}

eventsSeenSoFar.update(newWindows);
}

private void updateLastSeenSequenceNumber(
Event lastEventInWindow,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) throws IOException {
long lastSequenceNumberInWindow = lastEventInWindow.getSequenceNumber();
if (lastSequenceNumberSeenSoFar == null || lastSequenceNumberInWindow > lastSequenceNumberSeenSoFar) {
lastSequenceNumber.update(lastSequenceNumberInWindow);
} else if (lastSequenceNumberInWindow < lastSequenceNumberSeenSoFar) {
failWithSequenceNumberDecreased(lastEventInWindow, lastSequenceNumberSeenSoFar, out);
}
}

private void failWithSequenceNumberDecreased(
Event lastEventInWindow,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
out.collect(String.format("Last event in current window (%s) has sequence number lower than seen so far (%d)",
lastEventInWindow,
lastSequenceNumberSeenSoFar));
}

/**
* Verifies if all values from previous windows appear in the new one. Returns union of all events seen so far that
* were not seen <b>slideFactor</b> number of times yet.
*/
private List<Tuple2<Event, Integer>> verifyPreviousOccurences(
List<Tuple2<Event, Integer>> previousWindowValues,
List<Event> newValues,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
List<Tuple2<Event, Integer>> newEventsSeenSoFar = new ArrayList<>();
List<Event> seenEvents = new ArrayList<>();

for (Tuple2<Event, Integer> windowValue : previousWindowValues) {
if (!newValues.contains(windowValue.f0)) {
failWithEventNotSeenAlertMessage(windowValue, newValues, out);
} else {
seenEvents.add(windowValue.f0);
preserveOrDiscardIfSeenSlideFactorTimes(newEventsSeenSoFar, windowValue);
}
}

addNotSeenValues(newEventsSeenSoFar, newValues, seenEvents, lastSequenceNumberSeenSoFar, out);

return newEventsSeenSoFar;
}

private void addNotSeenValues(
List<Tuple2<Event, Integer>> newEventsSeenSoFar,
List<Event> newValues,
List<Event> seenValues,
Long lastSequenceNumberSeenSoFar,
Collector<String> out) {
newValues.stream()
.filter(e -> !seenValues.contains(e))
.forEach(e -> {
if (lastSequenceNumberSeenSoFar == null || e.getSequenceNumber() > lastSequenceNumberSeenSoFar) {
newEventsSeenSoFar.add(Tuple2.of(e, 1));
} else {
failWithEventSeenTooManyTimesMessage(e, out);
}
});
}

private void failWithEventSeenTooManyTimesMessage(Event e, Collector<String> out) {
out.collect(String.format("Alert: event %s seen more than %d times", e, slideFactor));
}

private void preserveOrDiscardIfSeenSlideFactorTimes(
List<Tuple2<Event, Integer>> newEvenstSeenSoFar,
Tuple2<Event, Integer> windowValue) {
int timesSeen = windowValue.f1 + 1;
if (timesSeen != slideFactor) {
newEvenstSeenSoFar.add(Tuple2.of(windowValue.f0, timesSeen));
}
}

private void failWithEventNotSeenAlertMessage(
Tuple2<Event, Integer> previousWindowValue,
List<Event> currentWindowValues,
Collector<String> out) {
out.collect(String.format(
"Alert: event %s did not belong to %d consecutive windows. " +
"Event seen so far %d times.Current window: %s",
previousWindowValue.f0,
slideFactor,
previousWindowValue.f1,
currentWindowValues));
}

private Optional<Event> verifyWindowContiguity(List<Event> newValues, Collector<String> out) {
return newValues.stream()
.sorted(Comparator.comparingLong(Event::getSequenceNumber))
.reduce((event, event2) -> {
if (event2.getSequenceNumber() - 1 != event.getSequenceNumber()) {
out.collect("Alert: events in window out ouf order!");
}

return event2;
});
}
}

0 comments on commit 22e35bf

Please sign in to comment.