Skip to content

Commit

Permalink
add client id, username, mqtt topic to kafak filter for mqtt (#1069)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Aug 30, 2023
1 parent e05b392 commit ac7d43d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
2 changes: 1 addition & 1 deletion doc/cookbook/mqtt-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ In this example, we use pipeline to process MQTT Connect packet (check username
Now, we support following filters for MQTTProxy:
- `TopicMapper`: map MQTT Publish packet multi-level topic into single topic and key-value headers.
- `MQTTClientAuth`: provide username and password checking for MQTT Connect packet.
- `KafkaMQTT`: send MQTT Publish message to Kafka backend.
- `KafkaMQTT`: send MQTT Publish message to Kafka backend. By default, `KafkaMQTT` filter will add `clientID`, `username`, `mqttTopic` to Kafka message headers.

# Topic Mapping
In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to a single topic with headers. For example:
Expand Down
11 changes: 9 additions & 2 deletions pkg/filters/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (k *Kafka) Status() interface{} {
// Handle handles context
func (k *Kafka) Handle(ctx *context.Context) string {
var topic string
var headers map[string]string
var payload []byte
var ok bool

Expand All @@ -161,8 +160,9 @@ func (k *Kafka) Handle(ctx *context.Context) string {
return resultGetDataFailed
}
}
var headerFromData map[string]string
if k.headerKey != "" {
headers, ok = ctx.GetData(k.headerKey).(map[string]string)
headerFromData, ok = ctx.GetData(k.headerKey).(map[string]string)
if !ok {
return resultGetDataFailed
}
Expand All @@ -175,16 +175,23 @@ func (k *Kafka) Handle(ctx *context.Context) string {
}

req := ctx.GetInputRequest().(*mqttprot.Request)
headers := map[string]string{}
// set data from PublishPacket if data is missing
headers["clientID"] = req.Client().ClientID()
headers["username"] = req.Client().UserName()
if req.PacketType() == mqttprot.PublishType {
p := req.PublishPacket()
headers["mqttTopic"] = p.TopicName
if topic == "" {
topic = p.TopicName
}
if payload == nil {
payload = p.Payload
}
}
for k, v := range headerFromData {
headers[k] = v
}

if topic == "" {
topic = k.defaultTopic
Expand Down
21 changes: 16 additions & 5 deletions pkg/filters/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ func defaultFilterSpec(spec *Spec) filters.Spec {
return spec
}

func newContext(cid string, topic string, payload []byte) *context.Context {
func newContext(cid string, username string, topic string, payload []byte) *context.Context {
ctx := context.New(nil)

client := &mqttprot.MockClient{
MockClientID: cid,
MockUserName: username,
}
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = topic
Expand Down Expand Up @@ -132,13 +133,23 @@ func TestKafka(t *testing.T) {
done: make(chan struct{}),
}

mqttCtx := newContext("test", "a/b/c", []byte("text"))
mqttCtx := newContext("test", "user123", "a/b/c", []byte("text"))
kafka.Handle(mqttCtx)
msg := <-kafka.producer.(*mockAsyncProducer).ch

req := mqttCtx.GetInputRequest().(*mqttprot.Request)
assert.Equal(msg.Topic, req.PublishPacket().TopicName)
assert.Equal(0, len(msg.Headers))

assert.Equal(3, len(msg.Headers))
headerMap := map[string]string{
"clientID": "test",
"mqttTopic": "a/b/c",
"username": "user123",
}
for _, h := range msg.Headers {
assert.Equal(headerMap[string(h.Key)], string(h.Value))
}

value, err := msg.Value.Encode()
assert.Nil(err)
assert.Equal("text", string(value))
Expand All @@ -165,14 +176,14 @@ func TestKafkaWithKVMap(t *testing.T) {
kafka.setKV()
defer kafka.Close()

mqttCtx := newContext("test", "a/b/c", []byte("text"))
mqttCtx := newContext("test", "user123", "a/b/c", []byte("text"))
mqttCtx.SetData("topic", "123")
mqttCtx.SetData("headers", map[string]string{"1": "a"})

kafka.Handle(mqttCtx)
msg := <-kafka.producer.(*mockAsyncProducer).ch
assert.Equal("123", msg.Topic)
assert.Equal(1, len(msg.Headers))
assert.Equal(4, len(msg.Headers))
value, err := msg.Value.Encode()
assert.Nil(err)
assert.Equal("text", string(value))
Expand Down

0 comments on commit ac7d43d

Please sign in to comment.