Skip to content

Commit

Permalink
[FLINK-6904] [cep] Support for quantifier range to CEP's pattern API
Browse files Browse the repository at this point in the history
This closes apache#4121
  • Loading branch information
dianfu authored and dawidwys committed Jun 20, 2017
1 parent 6cf6cb8 commit 8835da9
Show file tree
Hide file tree
Showing 6 changed files with 657 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
this
}

/**
* Specifies that the pattern can occur between from and to times.
*
* @param from number of times matching event must appear at least
* @param to number of times matching event must appear at most
* @return The same pattern with the number of times range applied
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def times(from: Int, to: Int): Pattern[T, F] = {
jPattern.times(from, to)
this
}

/**
* Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
* this option allows more flexibility to the matching events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cep.pattern.MalformedPatternException;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.Quantifier.Times;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.NotCondition;
Expand Down Expand Up @@ -372,33 +373,23 @@ private void addStopStateToLooping(final State<T> loopingState) {
* @param times number of times the state should be copied
* @return the first state of the "complex" state, next state should point to it
*/
private State<T> createTimesState(final State<T> sinkState, int times) {
State<T> lastSink = copyWithoutTransitiveNots(sinkState);
for (int i = 0; i < times - 1; i++) {
lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
private State<T> createTimesState(final State<T> sinkState, Times times) {
State<T> lastSink = sinkState;
final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
for (int i = times.getFrom(); i < times.getTo(); i++) {
lastSink = createSingletonState(lastSink, sinkState, innerIgnoreCondition, true);
addStopStateToLooping(lastSink);
}

final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);

// we created the intermediate states in the loop, now we create the start of the loop.
if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
return createSingletonState(lastSink, ignoreCondition, false);
}

final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
singletonState.addTake(lastSink, currentCondition);
singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());

if (ignoreCondition != null) {
State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
ignoreState.addTake(lastSink, currentCondition);
ignoreState.addIgnore(ignoreCondition);
singletonState.addIgnore(ignoreState, ignoreCondition);
addStopStates(ignoreState);
for (int i = 0; i < times.getFrom() - 1; i++) {
lastSink = createSingletonState(lastSink, null, innerIgnoreCondition, false);
addStopStateToLooping(lastSink);
}
return singletonState;
// we created the intermediate states in the loop, now we create the start of the loop.
return createSingletonState(
lastSink,
sinkState,
getIgnoreCondition(currentPattern),
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
}

/**
Expand All @@ -412,6 +403,7 @@ private State<T> createTimesState(final State<T> sinkState, int times) {
@SuppressWarnings("unchecked")
private State<T> createSingletonState(final State<T> sinkState) {
return createSingletonState(
sinkState,
sinkState,
getIgnoreCondition(currentPattern),
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
Expand All @@ -424,10 +416,15 @@ private State<T> createSingletonState(final State<T> sinkState) {
*
* @param ignoreCondition condition that should be applied to IGNORE transition
* @param sinkState state that the state being converted should point to
* @param proceedState state that the state being converted should proceed to
* @param isOptional whether the state being converted is optional
* @return the created state
*/
@SuppressWarnings("unchecked")
private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
private State<T> createSingletonState(final State<T> sinkState,
final State<T> proceedState,
final IterativeCondition<T> ignoreCondition,
final boolean isOptional) {
final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();

Expand All @@ -438,7 +435,7 @@ private State<T> createSingletonState(final State<T> sinkState, final IterativeC

if (isOptional) {
// if no element accepted the previous nots are still valid.
singletonState.addProceed(sinkState, trueFunction);
singletonState.addProceed(proceedState, trueFunction);
}

if (ignoreCondition != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
import org.apache.flink.cep.pattern.Quantifier.Times;
import org.apache.flink.cep.pattern.conditions.AndCondition;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.OrCondition;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class Pattern<T, F extends T> {
* Applicable to a {@code times} pattern, and holds
* the number of times it has to appear.
*/
private int times;
private Times times;

protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
this.name = name;
Expand All @@ -84,7 +85,7 @@ protected Pattern(
return previous;
}

public int getTimes() {
public Times getTimes() {
return times;
}

Expand Down Expand Up @@ -318,7 +319,27 @@ public Pattern<T, F> times(int times) {
checkIfQuantifierApplied();
Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
this.times = times;
this.times = Times.of(times);
return this;
}

/**
* Specifies that the pattern can occur between from and to times.
*
* @param from number of times matching event must appear at least
* @param to number of times matching event must appear at most
* @return The same pattern with the number of times range applied
*
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> times(int from, int to) {
checkIfNoNotPattern();
checkIfQuantifierApplied();
this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
if (from == 0) {
this.quantifier.optional();
}
this.times = Times.of(from, to);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.cep.pattern;

import org.apache.flink.util.Preconditions;

import java.util.EnumSet;
import java.util.Objects;

Expand Down Expand Up @@ -143,4 +145,35 @@ public enum ConsumingStrategy {
NOT_NEXT
}

/**
* Describe the times this {@link Pattern} can occur.
*/
public static class Times {
private final int from;
private final int to;

private Times(int from, int to) {
Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0.");
this.from = from;
this.to = to;
}

public int getFrom() {
return from;
}

public int getTo() {
return to;
}

public static Times of(int from, int to) {
return new Times(from, to);
}

public static Times of(int times) {
return new Times(times, times);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,6 @@ public boolean filter(Event value) throws Exception {
Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
));
}

/////////////////////////////// Consecutive ////////////////////////////////////////

private static class ConsecutiveData {
Expand Down
Loading

0 comments on commit 8835da9

Please sign in to comment.