From ac7d43dd914ac8c6c18adadc98df422f01f72d60 Mon Sep 17 00:00:00 2001 From: su chen Date: Wed, 30 Aug 2023 13:26:45 +0800 Subject: [PATCH] add client id, username, mqtt topic to kafak filter for mqtt (#1069) --- doc/cookbook/mqtt-proxy.md | 2 +- pkg/filters/kafka/kafka.go | 11 +++++++++-- pkg/filters/kafka/kafka_test.go | 21 ++++++++++++++++----- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/doc/cookbook/mqtt-proxy.md b/doc/cookbook/mqtt-proxy.md index e855e60b3c..ac7cb3fe06 100644 --- a/doc/cookbook/mqtt-proxy.md +++ b/doc/cookbook/mqtt-proxy.md @@ -111,7 +111,7 @@ In this example, we use pipeline to process MQTT Connect packet (check username Now, we support following filters for MQTTProxy: - `TopicMapper`: map MQTT Publish packet multi-level topic into single topic and key-value headers. - `MQTTClientAuth`: provide username and password checking for MQTT Connect packet. -- `KafkaMQTT`: send MQTT Publish message to Kafka backend. +- `KafkaMQTT`: send MQTT Publish message to Kafka backend. By default, `KafkaMQTT` filter will add `clientID`, `username`, `mqttTopic` to Kafka message headers. # Topic Mapping In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to a single topic with headers. For example: diff --git a/pkg/filters/kafka/kafka.go b/pkg/filters/kafka/kafka.go index d701f5c41f..1598e9492e 100644 --- a/pkg/filters/kafka/kafka.go +++ b/pkg/filters/kafka/kafka.go @@ -150,7 +150,6 @@ func (k *Kafka) Status() interface{} { // Handle handles context func (k *Kafka) Handle(ctx *context.Context) string { var topic string - var headers map[string]string var payload []byte var ok bool @@ -161,8 +160,9 @@ func (k *Kafka) Handle(ctx *context.Context) string { return resultGetDataFailed } } + var headerFromData map[string]string if k.headerKey != "" { - headers, ok = ctx.GetData(k.headerKey).(map[string]string) + headerFromData, ok = ctx.GetData(k.headerKey).(map[string]string) if !ok { return resultGetDataFailed } @@ -175,9 +175,13 @@ func (k *Kafka) Handle(ctx *context.Context) string { } req := ctx.GetInputRequest().(*mqttprot.Request) + headers := map[string]string{} // set data from PublishPacket if data is missing + headers["clientID"] = req.Client().ClientID() + headers["username"] = req.Client().UserName() if req.PacketType() == mqttprot.PublishType { p := req.PublishPacket() + headers["mqttTopic"] = p.TopicName if topic == "" { topic = p.TopicName } @@ -185,6 +189,9 @@ func (k *Kafka) Handle(ctx *context.Context) string { payload = p.Payload } } + for k, v := range headerFromData { + headers[k] = v + } if topic == "" { topic = k.defaultTopic diff --git a/pkg/filters/kafka/kafka_test.go b/pkg/filters/kafka/kafka_test.go index d98ed85b0f..9d8e82844a 100644 --- a/pkg/filters/kafka/kafka_test.go +++ b/pkg/filters/kafka/kafka_test.go @@ -98,11 +98,12 @@ func defaultFilterSpec(spec *Spec) filters.Spec { return spec } -func newContext(cid string, topic string, payload []byte) *context.Context { +func newContext(cid string, username string, topic string, payload []byte) *context.Context { ctx := context.New(nil) client := &mqttprot.MockClient{ MockClientID: cid, + MockUserName: username, } packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) packet.TopicName = topic @@ -132,13 +133,23 @@ func TestKafka(t *testing.T) { done: make(chan struct{}), } - mqttCtx := newContext("test", "a/b/c", []byte("text")) + mqttCtx := newContext("test", "user123", "a/b/c", []byte("text")) kafka.Handle(mqttCtx) msg := <-kafka.producer.(*mockAsyncProducer).ch req := mqttCtx.GetInputRequest().(*mqttprot.Request) assert.Equal(msg.Topic, req.PublishPacket().TopicName) - assert.Equal(0, len(msg.Headers)) + + assert.Equal(3, len(msg.Headers)) + headerMap := map[string]string{ + "clientID": "test", + "mqttTopic": "a/b/c", + "username": "user123", + } + for _, h := range msg.Headers { + assert.Equal(headerMap[string(h.Key)], string(h.Value)) + } + value, err := msg.Value.Encode() assert.Nil(err) assert.Equal("text", string(value)) @@ -165,14 +176,14 @@ func TestKafkaWithKVMap(t *testing.T) { kafka.setKV() defer kafka.Close() - mqttCtx := newContext("test", "a/b/c", []byte("text")) + mqttCtx := newContext("test", "user123", "a/b/c", []byte("text")) mqttCtx.SetData("topic", "123") mqttCtx.SetData("headers", map[string]string{"1": "a"}) kafka.Handle(mqttCtx) msg := <-kafka.producer.(*mockAsyncProducer).ch assert.Equal("123", msg.Topic) - assert.Equal(1, len(msg.Headers)) + assert.Equal(4, len(msg.Headers)) value, err := msg.Value.Encode() assert.Nil(err) assert.Equal("text", string(value))