Skip to content

Commit

Permalink
[FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the …
Browse files Browse the repository at this point in the history
…SharedBuffer should be defined with a threshold to limit the number of the elements in the cache

Co-authored-by: Arvid Heise <[email protected]>
  • Loading branch information
2 people authored and AHeise committed Dec 9, 2021
1 parent 2e9f9ad commit 4d142b3
Show file tree
Hide file tree
Showing 19 changed files with 470 additions and 101 deletions.
8 changes: 8 additions & 0 deletions docs/content.zh/docs/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,14 @@ public interface TimeContext {
使用`ProcessingTime`时,这个值等于事件进入CEP算子的时间点(在`PatternProcessFunction`中是匹配产生的时间)。
这意味着多次调用这个方法得到的值是一致的。

## 可选的参数设置

用于配置 Flink CEP 的 `SharedBuffer` 缓存容量的选项。它可以加快 CEP 算子的处理速度,并限制内存中缓存的元素数量。

<span class="label label-info">Note</span> 仅当 `state.backend` 设置为 `rocksdb` 时限制内存使用才有效,这会将超过缓存数量的元素传输到 `rocksdb` 状态存储而不是内存状态存储。当 `state.backend` 设置为 `rocksdb` 时,这些配置项有助于限制内存。相比之下,当 `state.backend` 设置为非 `rocksdb` 时,缓存会导致性能下降。与使用 `Map` 实现的旧缓存相比,状态部分将包含更多从 `guava-cache` 换出的元素,这将使得 `copy on write` 时的状态处理增加一些开销。

{{< generated/cep_cache_configuration >}}

## 例子

下面的例子在一个分片的`Events`流上检测模式`start, middle(name = "error") -> end(name = "critical")`
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,16 @@ Call to `TimeContext#currentProcessingTime` always gives you the value of curren
In case of `TimeContext#timestamp()` the returned value is equal to assigned timestamp in case of `EventTime`. In `ProcessingTime` this will equal to the point of time when said event entered
cep operator (or when the match was generated in case of `PatternProcessFunction`). This means that the value will be consistent across multiple calls to that method.

## Optional Configuration

Options to configure the cache capacity of Flink CEP `SharedBuffer`.
It could accelerate the CEP operate process speed and limit the number of elements of cache in pure memory.

<span class="label label-info">Note</span> It's only effective to limit usage of memory when `state.backend` was set as `rocksdb`, which would transport the elements exceeded the number of the cache into the rocksdb state storage instead of memory state storage.
The configuration items are helpful for memory limitation when the `state.backend` is set as rocksdb. By contrast,when the `state.backend` is set as not `rocksdb`, the cache would cause performance decreased. Compared with old cache implemented with `Map`, the state part will contain more elements swapped out from new guava-cache, which would make it heavier to `copy on write` for state.

{{< generated/cep_cache_configuration >}}

## Examples

The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data
Expand Down
30 changes: 30 additions & 0 deletions docs/layouts/shortcodes/generated/cep_cache_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>pipeline.cep.sharedbuffer.cache.entry-slots</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The Config option to set the maximum element number the entryCache of SharedBuffer could hold. And it could accelerate the CEP operate process speed with state.And it could accelerate the CEP operate process speed and limit the capacity of cache in pure memory. Note: It's only effective to limit usage of memory when 'state.backend' was set as 'rocksdb', which would transport the elements exceeded the number of the cache into the rocksdb state storage instead of memory state storage.</td>
</tr>
<tr>
<td><h5>pipeline.cep.sharedbuffer.cache.event-slots</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The Config option to set the maximum element number the eventsBufferCache of SharedBuffer could hold. And it could accelerate the CEP operate process speed and limit the capacity of cache in pure memory. Note: It's only effective to limit usage of memory when 'state.backend' was set as 'rocksdb', which would transport the elements exceeded the number of the cache into the rocksdb state storage instead of memory state storage.</td>
</tr>
<tr>
<td><h5>pipeline.cep.sharedbuffer.cache.statistics-interval</h5></td>
<td style="word-wrap: break-word;">30 min</td>
<td>Duration</td>
<td>The interval to log the information of cache state statistics in CEP operator.</td>
</tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions flink-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public class ConfigOptionsDocGenerator {
"org.apache.flink.connector.pulsar.common.config"),
new OptionsClassLocation(
"flink-connectors/flink-connector-pulsar",
"org.apache.flink.connector.pulsar.source")
"org.apache.flink.connector.pulsar.source"),
new OptionsClassLocation(
"flink-libraries/flink-cep", "org.apache.flink.cep.configuration")
};

static final Set<String> EXCLUSIONS =
Expand All @@ -112,7 +114,8 @@ public class ConfigOptionsDocGenerator {
"org.apache.flink.configuration.ConfigOptions",
"org.apache.flink.streaming.api.environment.CheckpointConfig",
"org.apache.flink.contrib.streaming.state.PredefinedOptions",
"org.apache.flink.python.PythonConfig"));
"org.apache.flink.python.PythonConfig",
"org.apache.flink.cep.configuration.SharedBufferCacheConfig"));

static final String DEFAULT_PATH_PREFIX = "src/main/java";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.configuration;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.util.TimeUtils;

import java.time.Duration;

/** CEP Cache Options. */
public class CEPCacheOptions {

private CEPCacheOptions() {}

private static final String COMMON_HINT =
"And it could accelerate the CEP operate process "
+ "speed and limit the capacity of cache in pure memory. Note: It's only effective to "
+ "limit usage of memory when 'state.backend' was set as 'rocksdb', which would "
+ "transport the elements exceeded the number of the cache into the rocksdb state "
+ "storage instead of memory state storage.";

public static final ConfigOption<Integer> CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS =
ConfigOptions.key("pipeline.cep.sharedbuffer.cache.event-slots")
.intType()
.defaultValue(1024)
.withDescription(
"The Config option to set the maximum element number the "
+ "eventsBufferCache of SharedBuffer could hold. "
+ COMMON_HINT);

public static final ConfigOption<Integer> CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS =
ConfigOptions.key("pipeline.cep.sharedbuffer.cache.entry-slots")
.intType()
.defaultValue(1024)
.withDescription(
"The Config option to set the maximum element number the entryCache"
+ " of SharedBuffer could hold. And it could accelerate the"
+ " CEP operate process speed with state."
+ COMMON_HINT);

public static final ConfigOption<Duration> CEP_CACHE_STATISTICS_INTERVAL =
ConfigOptions.key("pipeline.cep.sharedbuffer.cache.statistics-interval")
.durationType()
.defaultValue(TimeUtils.parseDuration("30 min"))
.withDescription(
"The interval to log the information of cache state statistics in "
+ "CEP operator.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.configuration;

import org.apache.flink.configuration.ReadableConfig;

import java.io.Serializable;
import java.time.Duration;

/** Configuration immutable class. */
public final class SharedBufferCacheConfig implements Serializable {
private final int eventsBufferCacheSlots;
private final int entryCacheSlots;
private final Duration cacheStatisticsInterval;

public int getEventsBufferCacheSlots() {
return eventsBufferCacheSlots;
}

public int getEntryCacheSlots() {
return entryCacheSlots;
}

public Duration getCacheStatisticsInterval() {
return cacheStatisticsInterval;
}

public SharedBufferCacheConfig() {
this.cacheStatisticsInterval = CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL.defaultValue();
this.entryCacheSlots = CEPCacheOptions.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS.defaultValue();
this.eventsBufferCacheSlots =
CEPCacheOptions.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS.defaultValue();
}

public SharedBufferCacheConfig(
final int eventsBufferCacheSlots,
final int entryCacheSlots,
final Duration cacheStatisticsInterval) {
this.cacheStatisticsInterval = cacheStatisticsInterval;
this.entryCacheSlots = entryCacheSlots;
this.eventsBufferCacheSlots = eventsBufferCacheSlots;
}

public static SharedBufferCacheConfig of(ReadableConfig readableConfig) {
int eventsBufferCacheSlots =
readableConfig.get(CEPCacheOptions.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS);
int entryCacheSlots =
readableConfig.get(CEPCacheOptions.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS);
Duration cacheStatisticsInterval =
readableConfig.get(CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL);
return new SharedBufferCacheConfig(
eventsBufferCacheSlots, entryCacheSlots, cacheStatisticsInterval);
}
}
Loading

0 comments on commit 4d142b3

Please sign in to comment.