Skip to content

yangl/pulsar-msg-filter

Repository files navigation

pulsar-msg-filter

message filter for Apache Pulsar, both support server-side and client-side.


简体中文 | English

pulsar-msg-filter-plugin 是一个基于PIP 105: Support pluggable entry filter in DispatcherApache Pulsar 实现的 服务端 消息过滤插件。

pulsar-msg-filter-interceptor 是一个基于 Pulsar ConsumerInterceptor 实现的 客户端 消息过滤拦截器。

特性介绍

  1. 高性能、小巧
  2. 支持常见条件表达式,几乎满足各种业务过滤场景
[server-side] pulsar-msg-filter-plugin 使用说明
  1. 下载pulsar-msg-filter-plugin-VERSION.nar插件并保存至指定目录,如/app/conf/plugin

  2. 修改pulsar broker.conf配置(version >= 2.10),插件名称pulsar-msg-filter

    # Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
    # You can use this class to decide which entries can be sent to consumers.
    # Multiple classes need to be separated by commas.
    entryFilterNames=pulsar-msg-filter
    
    # The directory for all the entry filter implementations
    entryFiltersDirectory=/app/plugin
    # Location of unpacked NAR file
    narExtractionDirectory=/app/nar
  3. 重启broker,查看日志,如果看到如下日志:

    Successfully loaded entry filter for name `pulsar-msg-filter`

    则说明配置成功

  4. 验证(option)

    1. 发送方构建Producer实例时关闭 batch 操作 .enableBatching(false)

      Producer<String> producer = client.newProducer(Schema.STRING)
          .topic("test-topic-1")
          .enableBatching(false)
          .create();
       
      producer.newMessage()
          .property("k1","7")
          .property("k2", "vvvv")
          .property("k3", "true")
          .value("hi, this msg from `pulsar-msg-filter-plugin`")
          .send();
    2. 消费方使用admin配置订阅消费组过滤表达式,其key固定为 pulsar-msg-filter-expression

      注:复杂表达式记得添加 "" 防止被转义
      pulsar-admin topics update-subscription-properties --property --property pulsar-msg-filter-expression="double(k1)<6 || (k2=='vvvv' && k3=='true')" --subscription 订阅组名称 主题
      
      pulsar-admin topics get-subscription-properties --subscription 订阅组名称 主题
      如上配置修改后立马生效,无需在创建Consumer时再设置subscriptionProperties
        Consumer consumer = client.newConsumer()
          .subscriptionName("订阅组名称")
          .topic("主题")
          .subscribe();
      说明: pulsar-msg-filter-plugin插件(服务端)依赖消息的MessageMetadata,故需关闭发送端的batch操作,否则无效(.enableBatching(false)),如无法关闭可配合pulsar-msg-filter-interceptor 一起使用。
[client-side] pulsar-msg-filter-interceptor 使用说明
  1. 添加 pulsar-msg-filter-interceptor 依赖

     <dependency>
         <groupId>io.github.yangl</groupId>
         <artifactId>pulsar-msg-filter-interceptor</artifactId>
         <version>VERSION</version>
     </dependency>
  2. 创建Consumer实例时配置 MsgFilterConsumerInterceptor 过滤器

    Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .subscriptionName("订阅组名称")
            .topic("主题")
            .intercept(MsgFilterConsumerInterceptor.<String>builder().build())
            .subscribe();
    说明: 如果创建client时使用的是 pulsar:https:// 开头的地址,需额外使用https://设置 .webServiceUrl(YOUR_HTTP_SERVICE_URL) 参数。
    .intercept(MsgFilterConsumerInterceptor.<String>builder().webServiceUrl(YOUR_HTTP_SERVICE_URL).build())
注意事项
  • 由于pulsar message header的key&value全部为String类型,在使用表达式的时候注意将其类型转换至目标类型
  • AviatorScript的false判断个人建议直接使用字符串的 == true/false比较,AviatorScript只有nil false为false,其他全部为true
  • 过滤引擎使用AviatorScript (感谢晓丹),其内置函数详见其 函数库列表

License

pulsar-msg-filter is licensed under the AGPLv3 License.

Links