Skip to content

Commit

Permalink
[FLINK-27392][cep] CEP Pattern supports definition of the maximum tim…
Browse files Browse the repository at this point in the history
…e gap between events

This closes apache#20029.
  • Loading branch information
SteNicholas authored and dianfu committed Jun 28, 2022
1 parent 6f7c273 commit 4a82df0
Show file tree
Hide file tree
Showing 18 changed files with 1,036 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class ComputationState {
// Timestamp of the first element in the pattern
private final long startTimestamp;

// Timestamp of the previous element of the pattern
private final long previousTimestamp;

@Nullable private final NodeId previousBufferEntry;

@Nullable private final EventId startEventID;
Expand All @@ -50,10 +53,12 @@ private ComputationState(
@Nullable final NodeId previousBufferEntry,
final DeweyNumber version,
@Nullable final EventId startEventID,
final long startTimestamp) {
final long startTimestamp,
final long previousTimestamp) {
this.currentStateName = currentState;
this.version = version;
this.startTimestamp = startTimestamp;
this.previousTimestamp = previousTimestamp;
this.previousBufferEntry = previousBufferEntry;
this.startEventID = startEventID;
}
Expand All @@ -70,6 +75,10 @@ public long getStartTimestamp() {
return startTimestamp;
}

public long getPreviousTimestamp() {
return previousTimestamp;
}

public String getCurrentStateName() {
return currentStateName;
}
Expand All @@ -85,6 +94,7 @@ public boolean equals(Object obj) {
return Objects.equals(currentStateName, other.currentStateName)
&& Objects.equals(version, other.version)
&& startTimestamp == other.startTimestamp
&& previousTimestamp == other.previousTimestamp
&& Objects.equals(startEventID, other.startEventID)
&& Objects.equals(previousBufferEntry, other.previousBufferEntry);
} else {
Expand All @@ -102,6 +112,8 @@ public String toString() {
+ version
+ ", startTimestamp="
+ startTimestamp
+ ", previousTimestamp="
+ previousTimestamp
+ ", previousBufferEntry="
+ previousBufferEntry
+ ", startEventID="
Expand All @@ -112,24 +124,35 @@ public String toString() {
@Override
public int hashCode() {
return Objects.hash(
currentStateName, version, startTimestamp, startEventID, previousBufferEntry);
currentStateName,
version,
startTimestamp,
previousTimestamp,
startEventID,
previousBufferEntry);
}

public static ComputationState createStartState(final String state) {
return createStartState(state, new DeweyNumber(1));
}

public static ComputationState createStartState(final String state, final DeweyNumber version) {
return createState(state, null, version, -1L, null);
return createState(state, null, version, -1L, -1L, null);
}

public static ComputationState createState(
final String currentState,
final NodeId previousEntry,
final DeweyNumber version,
final long startTimestamp,
final long previousTimestamp,
final EventId startEventID) {
return new ComputationState(
currentState, previousEntry, version, startEventID, startTimestamp);
currentState,
previousEntry,
version,
startEventID,
startTimestamp,
previousTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ static <T> Queue<ComputationState> deserializeComputationStates(

computationStates.add(
ComputationState.createState(
state, nodeId, version, startTimestamp, startEventId));
state, nodeId, version, startTimestamp, -1L, startEventId));
}
return computationStates;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public class NFA<T> {
*/
private final Map<String, State<T>> states;

/**
* The lengths of a windowed pattern, as specified using the {@link
* org.apache.flink.cep.pattern.Pattern#within(Time, WithinType)} Pattern.within(Time,
* WithinType)} method with {@code WithinType.PREVIOUS_AND_CURRENT}.
*/
private final Map<String, Long> windowTimes;

/**
* The length of a windowed pattern, as specified using the {@link
* org.apache.flink.cep.pattern.Pattern#within(Time)} Pattern.within(Time)} method.
Expand All @@ -105,11 +112,13 @@ public class NFA<T> {

public NFA(
final Collection<State<T>> validStates,
final Map<String, Long> windowTimes,
final long windowTime,
final boolean handleTimeout) {
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
this.states = loadStates(validStates);
this.windowTimes = windowTimes;
}

private Map<String, State<T>> loadStates(final Collection<State<T>> validStates) {
Expand Down Expand Up @@ -273,7 +282,21 @@ public Collection<Map<String, List<T>>> process(
new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);

for (ComputationState computationState : nfaState.getPartialMatches()) {
if (isStateTimedOut(computationState, timestamp)) {
String currentStateName = computationState.getCurrentStateName();
boolean isTimeoutForPreviousEvent =
windowTimes.containsKey(currentStateName)
&& isStateTimedOut(
computationState,
timestamp,
computationState.getPreviousTimestamp(),
windowTimes.get(currentStateName));
boolean isTimeoutForFirstEvent =
isStateTimedOut(
computationState,
timestamp,
computationState.getStartTimestamp(),
windowTime);
if (isTimeoutForPreviousEvent || isTimeoutForFirstEvent) {
if (getState(computationState).isPending()) {
// extract the Pending State
Map<String, List<T>> pendingPattern =
Expand All @@ -288,7 +311,11 @@ public Collection<Map<String, List<T>>> process(
timeoutResult.add(
Tuple2.of(
timedOutPattern,
computationState.getStartTimestamp() + windowTime));
isTimeoutForPreviousEvent
? computationState.getPreviousTimestamp()
+ windowTimes.get(
computationState.getCurrentStateName())
: computationState.getStartTimestamp() + windowTime));
}

sharedBufferAccessor.releaseNode(
Expand All @@ -307,10 +334,12 @@ public Collection<Map<String, List<T>>> process(
return Tuple2.of(pendingMatches, timeoutResult);
}

private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
return !isStartState(state)
&& windowTime > 0L
&& timestamp - state.getStartTimestamp() >= windowTime;
private boolean isStateTimedOut(
final ComputationState state,
final long timestamp,
final long startTimestamp,
final long windowTime) {
return !isStartState(state) && windowTime > 0L && timestamp - startTimestamp >= windowTime;
}

private Collection<Map<String, List<T>>> doProcess(
Expand Down Expand Up @@ -646,6 +675,7 @@ private Collection<ComputationState> computeNextStates(
computationState.getPreviousBufferEntry(),
version,
computationState.getStartTimestamp(),
computationState.getPreviousTimestamp(),
computationState.getStartEventID());
}
}
Expand Down Expand Up @@ -677,6 +707,7 @@ private Collection<ComputationState> computeNextStates(
startTimestamp = computationState.getStartTimestamp();
startEventId = computationState.getStartEventID();
}
final long previousTimestamp = event.getTimestamp();

addComputationState(
sharedBufferAccessor,
Expand All @@ -685,6 +716,7 @@ private Collection<ComputationState> computeNextStates(
newEntry,
nextVersion,
startTimestamp,
previousTimestamp,
startEventId);

// check if newly created state is optional (have a PROCEED path to Final state)
Expand All @@ -698,6 +730,7 @@ private Collection<ComputationState> computeNextStates(
newEntry,
nextVersion,
startTimestamp,
previousTimestamp,
startEventId);
}
break;
Expand Down Expand Up @@ -733,6 +766,7 @@ private void addComputationState(
NodeId previousEntry,
DeweyNumber version,
long startTimestamp,
long previousTimestamp,
EventId startEventId)
throws Exception {
ComputationState computationState =
Expand All @@ -741,6 +775,7 @@ private void addComputationState(
previousEntry,
version,
startTimestamp,
previousTimestamp,
startEventId);
computationStates.add(computationState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,29 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
private TypeSerializer<NodeId> nodeIdSerializer;
private TypeSerializer<EventId> eventIdSerializer;

private final boolean supportsPreviousTimestamp;

public NFAStateSerializer() {
this.versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
this.nodeIdSerializer = new NodeId.NodeIdSerializer();
this(
DeweyNumber.DeweyNumberSerializer.INSTANCE,
new NodeId.NodeIdSerializer(),
EventId.EventIdSerializer.INSTANCE,
true);
}

NFAStateSerializer(
final TypeSerializer<DeweyNumber> versionSerializer,
final TypeSerializer<NodeId> nodeIdSerializer,
final TypeSerializer<EventId> eventIdSerializer) {
final TypeSerializer<EventId> eventIdSerializer,
final boolean supportsPreviousTimestamp) {
this.versionSerializer = checkNotNull(versionSerializer);
this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
this.eventIdSerializer = checkNotNull(eventIdSerializer);
this.supportsPreviousTimestamp = supportsPreviousTimestamp;
}

public boolean isSupportsPreviousTimestamp() {
return supportsPreviousTimestamp;
}

@Override
Expand Down Expand Up @@ -191,6 +201,9 @@ private void serializeSingleComputationState(
nodeIdSerializer.serialize(computationState.getPreviousBufferEntry(), target);
versionSerializer.serialize(computationState.getVersion(), target);
target.writeLong(computationState.getStartTimestamp());
if (supportsPreviousTimestamp) {
target.writeLong(computationState.getPreviousTimestamp());
}
serializeStartEvent(computationState.getStartEventID(), target);
}

Expand All @@ -200,11 +213,15 @@ private ComputationState deserializeSingleComputationState(DataInputView source)
NodeId prevState = nodeIdSerializer.deserialize(source);
DeweyNumber version = versionSerializer.deserialize(source);
long startTimestamp = source.readLong();
long previousTimestamp = -1L;
if (supportsPreviousTimestamp) {
previousTimestamp = source.readLong();
}

EventId startEventId = deserializeStartEvent(source);

return ComputationState.createState(
stateName, prevState, version, startTimestamp, startEventId);
stateName, prevState, version, startTimestamp, previousTimestamp, startEventId);
}

private void copySingleComputationState(DataInputView source, DataOutputView target)
Expand All @@ -216,6 +233,10 @@ private void copySingleComputationState(DataInputView source, DataOutputView tar
versionSerializer.serialize(version, target);
long startTimestamp = source.readLong();
target.writeLong(startTimestamp);
if (supportsPreviousTimestamp) {
long previousTimestamp = source.readLong();
target.writeLong(previousTimestamp);
}

copyStartEvent(source, target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;

/** Snapshot class for {@link NFAStateSerializer}. */
public class NFAStateSerializerSnapshot
extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> {

private static final int CURRENT_VERSION = 1;
private static final int CURRENT_VERSION = 2;

private static final int FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP = 2;

private boolean supportsPreviousTimestamp = true;

/** Constructor for read instantiation. */
public NFAStateSerializerSnapshot() {
Expand All @@ -37,13 +45,40 @@ public NFAStateSerializerSnapshot() {
/** Constructor to create the snapshot for writing. */
public NFAStateSerializerSnapshot(NFAStateSerializer serializerInstance) {
super(serializerInstance);
supportsPreviousTimestamp = serializerInstance.isSupportsPreviousTimestamp();
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}

@Override
protected void readOuterSnapshot(
int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
if (readOuterSnapshotVersion < FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP) {
supportsPreviousTimestamp = false;
} else {
supportsPreviousTimestamp = in.readBoolean();
}
}

@Override
protected void writeOuterSnapshot(DataOutputView out) throws IOException {
out.writeBoolean(supportsPreviousTimestamp);
}

@Override
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
NFAStateSerializer newSerializer) {
if (supportsPreviousTimestamp != newSerializer.isSupportsPreviousTimestamp()) {
return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
}

return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(NFAStateSerializer outerSerializer) {
TypeSerializer<DeweyNumber> versionSerializer = outerSerializer.getVersionSerializer();
Expand All @@ -67,6 +102,7 @@ protected NFAStateSerializer createOuterSerializerWithNestedSerializers(
@SuppressWarnings("unchecked")
TypeSerializer<EventId> eventIdSerializer = (TypeSerializer<EventId>) nestedSerializers[2];

return new NFAStateSerializer(versionSerializer, nodeIdSerializer, eventIdSerializer);
return new NFAStateSerializer(
versionSerializer, nodeIdSerializer, eventIdSerializer, supportsPreviousTimestamp);
}
}
Loading

0 comments on commit 4a82df0

Please sign in to comment.