Skip to content

Commit

Permalink
[FLINK-29403][cep] Streamline SimpleCondition usage
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Oct 4, 2022
1 parent 306c3c4 commit 990a7da
Show file tree
Hide file tree
Showing 23 changed files with 754 additions and 6,034 deletions.
142 changes: 39 additions & 103 deletions docs/content.zh/docs/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,13 @@ FlinkCEP 不是二进制发布包的一部分。在集群上执行如何链接
```java
DataStream<Event> input = ...;

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(SimpleCondition.of(event -> event.getId() == 42))
.next("middle")
.subtype(SubEvent.class)
.where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0))
.followedBy("end")
.where(SimpleCondition.of(event -> event.getName().equals("end")));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

Expand Down Expand Up @@ -314,12 +299,7 @@ middle.oneOrMore()
{{< tabs "20073e8b-3772-4faf-894c-e1bf2cbff15e" >}}
{{< tab "Java" >}}
```java
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand All @@ -334,12 +314,8 @@ start.where(event => event.getName.startsWith("foo"))
{{< tabs "5011129d-6c43-4fb7-84ae-3000d2296f28" >}}
{{< tab "Java" >}}
```java
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ...; // 一些判断条件
}
});
start.subtype(SubEvent.class)
.where(SimpleCondition.of(value -> ... /*一些判断条件*/));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand All @@ -355,17 +331,9 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* 一些判断条件 */)
{{< tabs "ba6bc50b-c9f6-4534-aff8-b2957ada791b" >}}
{{< tab "Java" >}}
```java
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // 一些判断条件
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // 一些判断条件
}
});
pattern
.where(SimpleCondition.of(value -> ... /*一些判断条件*/))
.or(SimpleCondition.of(value -> ... /*一些判断条件*/));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -708,17 +676,11 @@ next.within(Time.seconds(10))
{{< tab "Java" >}}
```java
Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).notFollowedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).within(Time.seconds(10));
.next("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.notFollowedBy("end")
.where(SimpleCondition.of(value -> value.getName().equals("b")))
.within(Time.seconds(10));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -766,24 +728,14 @@ Pattern.begin("start").where(_.getName().equals("a"))

<p>例如,一个如下的模式:</p>
```java
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
Pattern.<Event>begin("start")
.where(SimpleCondition.of(value -> value.getName().equals("c")))
.followedBy("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.oneOrMore()
.consecutive()
.followedBy("end1")
.where(SimpleCondition.of(value -> value.getName().equals("b")));
```
<p>输入:C D A1 A2 A3 D A4 B,会产生下面的输出:</p>

Expand All @@ -800,24 +752,14 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {

<p>例如,一个如下的模式:</p>
```java
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
Pattern.<Event>begin("start")
.where(SimpleCondition.of(value -> value.getName().equals("c")))
.followedBy("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.oneOrMore()
.allowCombinations()
.followedBy("end1")
.where(SimpleCondition.of(value -> value.getName().equals("b")));
```
<p>输入:C D A1 A2 A3 D A4 B,会产生如下的输出:</p>

Expand Down Expand Up @@ -1633,17 +1575,11 @@ DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
.next("middle")
.where(SimpleCondition.of(value -> value.getName().equals("error")))
.followedBy("end")
.where(SimpleCondition.of(value -> value.getName().equals("critical")))
.within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

Expand Down
141 changes: 38 additions & 103 deletions docs/content/docs/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,13 @@ because FlinkCEP uses them for comparing and matching events.
```java
DataStream<Event> input = ...;

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(SimpleCondition.of(event -> event.getId() == 42))
.next("middle")
.subtype(SubEvent.class)
.where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0))
.followedBy("end")
.where(SimpleCondition.of(event -> event.getName().equals("end")));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

Expand Down Expand Up @@ -317,12 +302,7 @@ whether to accept an event or not, based *only* on properties of the event itsel
{{< tabs "3a34bfc1-691f-41e7-88ee-c76ca6430e4c" >}}
{{< tab "Java" >}}
```java
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand All @@ -338,12 +318,8 @@ via the `pattern.subtype(subClass)` method.
{{< tabs "be703e92-5424-4a03-a358-abc84f0f2e65" >}}
{{< tab "Java" >}}
```java
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ...; // some condition
}
});
start.subtype(SubEvent.class)
.where(SimpleCondition.of(value -> ... /*some condition*/));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand All @@ -358,17 +334,8 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
{{< tabs "101511a2-3555-43c8-9c49-6c7ce24695f1" >}}
{{< tab "Java" >}}
```java
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // some condition
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ...; // or condition
}
});
pattern.where(SimpleCondition.of(value -> ... /*some condition*/))
.or(SimpleCondition.of(value -> ... /*some condition*/));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -725,17 +692,11 @@ E.g. a pattern like:
{{< tab "Java" >}}
```java
Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).notFollowedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).within(Time.seconds(10));
.next("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.notFollowedBy("end")
.where(SimpleCondition.of(value -> value.getName().equals("b")))
.within(Time.seconds(10));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -776,24 +737,14 @@ E.g. a pattern like:
{{< tabs consecutive >}}
{{< tab "Java" >}}
```java
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
Pattern.<Event>begin("start")
.where(SimpleCondition.of(value -> value.getName().equals("c")))
.followedBy("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.oneOrMore()
.consecutive()
.followedBy("end1")
.where(SimpleCondition.of(value -> value.getName().equals("b")));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -821,24 +772,14 @@ E.g. a pattern like:
{{< tabs allowcombinations >}}
{{< tab "Java" >}}
```java
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
Pattern.<Event>begin("start")
.where(SimpleCondition.of(value -> value.getName().equals("c")))
.followedBy("middle")
.where(SimpleCondition.of(value -> value.getName().equals("a")))
.oneOrMore()
.allowCombinations()
.followedBy("end1")
.where(SimpleCondition.of(value -> value.getName().equals("b")));
```
{{< /tab >}}
{{< tab "Scala" >}}
Expand Down Expand Up @@ -1568,17 +1509,11 @@ DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
.next("middle")
.where(SimpleCondition.of(value -> value.getName().equals("error")))
.followedBy("end")
.where(SimpleCondition.of(value -> value.getName().equals("critical")))
.within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

Expand Down
Loading

0 comments on commit 990a7da

Please sign in to comment.