Skip to content

Commit

Permalink
update mqtt-proxy doc (easegress-io#593)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci authored and localvar committed Jun 13, 2022
1 parent e1cb633 commit 0ad16f5
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 51 deletions.
156 changes: 110 additions & 46 deletions doc/cookbook/mqtt-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- [MQTT Proxy](#mqtt-proxy)
- [Background](#background)
- [Design](#design)
- [Topic Mapping](#topic-mapping)
- [Match different topic mapping policy](#match-different-topic-mapping-policy)
- [Detail of single policy](#detail-of-single-policy)
- [References](#references)
Expand All @@ -11,27 +12,28 @@

# Background
- MQTT is a standard messaging protocol for IoT (Internet of Things) which is extremely lightweight and used by a wide variety of industries.
- By supporting MQTT Proxy in Easegress, MQTT clients can produce messages to Kafka backend directly.
- By supporting MQTT Proxy in Easegress, MQTT clients can produce messages to backend through publish packet pipeline.
- We also provide the HTTP endpoint to allow the backend to send messages to MQTT clients.

# Design
- `MQTTProxy` is now a `BusinessController` to Easegress.
- Use `github.com/eclipse/paho.mqtt.golang/packets` to parse MQTT packet. `paho.mqtt.golang` is a MQTT 3.1.1 go client introduced by Eclipse Foundation (who also introduced the most widely used MQTT broker mosquitto).
- As a MQTT proxy, we now support MQTT clients to `publish` messages to backend Kafka with a powerful topic mapper to map multi-level MQTT topics to Kafka topics with headers (Details in following).
- As a MQTT proxy, we support MQTT clients to `publish` messages to backend through publish packet pipeline.
- Just like `HTTPPipeline`, `Pipeline` in MQTTProxy can use filters to do things like user authentication or topic mapping (map MQTT multi-level topic into single topic and key-value headers).
- We also support MQTT clients to `subscribe` topics (wildcard is supported) and send messages back to the MQTT clients through the HTTP endpoint.

```
publish msg topic mapper
MQTT client ------------> Easegress MQTTProxy ------------> Kafka
publish msg publish pipeline
MQTT client ------------> Easegress MQTTProxy ----------------> Backend like Kafka
all published msg will go to Kafka, will not send to other MQTT clients.
all published msg will go to Backend, will not send to other MQTT clients.
subscribe msg
MQTT client <---------------- Easegress MQTT HTTP Endpoint <---- Backend
MQTT client <---------------- Easegress MQTT HTTP Endpoint <---- Backend Server
all msg send back to MQTT clients come from HTTP endpoint.
```
- We assume that IoT devices (use MQTT client) report their status to the backend (through Kafka), and backend process these messages and send instructions back to IoT devices.
- We assume that IoT devices (use MQTT client) report their status to the backend (through tools like Kafka), and backend process these messages and send instructions back to IoT devices.

# Example
Save following yaml to file `mqttproxy.yaml` and then run
Expand All @@ -42,33 +44,100 @@ egctl object create -f mqttproxy.yaml
kind: MQTTProxy
name: mqttproxy
port: 1883 # tcp port for mqtt clients to connect
backendType: Kafka
kafkaBroker:
backend: ["123.123.123.123:9092", "234.234.234.234:9092"]
useTLS: true
certificate:
- name: cert1
cert: balabala
key: keyForbalabala
- name: cert2
cert: foo
key: bar
auth:
# username and password for mqtt clients to connect broker (from MQTT protocol)
- userName: test
passBase64: dGVzdA==
- userName: admin
passBase64: YWRtaW4=
topicMapper:
# map MQTT multi-level topic to Kafka topic with headers
# detail described in following
# if topicMapper is None, no map will happen and Kafka topic will be the MQTT topic
- name: cert1
cert: balabala
key: keyForbalabala
- name: cert2
cert: foo
key: bar
rules:
- when:
packetType: Connect
pipeline: pipeline-mqtt-auth
- when:
packetType: Publish
pipeline: pipeline-mqtt-publish

---

name: pipeline-mqtt-auth
kind: Pipeline
protocol: MQTT
flow:
- filter: auth
filters:
- name: auth
kind: MQTTClientAuth
salt: salt
auth:
# username and password are both test
- username: test
saltedSha256Pass: 1bc1a361f17092bc7af4b2f82bf9194ea9ee2ca49eb2e53e39f555bc1eeaed74

---

name: pipeline-mqtt-publish
kind: Pipeline
protocol: MQTT
flow:
- filter: publish-kafka-backend
filters:
- name: publish-kafka-backend
kind: Kafka
backend: ["127.0.0.1:9092"]
topic:
default: kafka-topic
```
In this example, we use pipeline to process MQTT Connect packet (check username and password) and Publish packet (send to Kafka backend).

Now, we support following filters for MQTTProxy:
- `TopicMapper`: map MQTT Publish packet multi-level topic into single topic and key-value headers.
- `MQTTClientAuth`: provide username and password checking for MQTT Connect packet.
- `Kafka`: send MQTT Publish message to Kafka backend.

# Topic Mapping
In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to a single topic with headers. For example:
```
MQTT multi-level topics:
- beijing/car/123/log
- shanghai/tv/234/status
- nanjing/phone/456/error
with corresponding pattern:
- loc/device/ID/event
with Topic mapper, may produce Kafka topic:
- topic: iot_device, headers: {loc: beijing, device: car, ID: 123, event: log}
- topic: iot_device, headers: {loc: shanghai, device: tv, ID: 234, event: status}
- topic: iot_device, headers: {loc: nanjing, device: phone, ID: 456, event: error}
```

Topic mapping can make processing MQTT messages easier. In Easegress, we use filter `TopicMapper` to do topic mapping.

Here's a simple example:
```yaml
name: pipeline-mqtt-publish
kind: Pipeline
protocol: MQTT
flow:
- filter: topic-mapper
- filter: publish-kafka-backend
filters:
- name: topic-mapper
kind: TopicMapper
setKV: # setKV set topic and header map into MQTT Context
topic: kafka-topic
headers: kafka-headers
# matchIndex and route will decide which policy we use to do the mapping for MQTT topic
matchIndex: 0
route:
- name: gateway
- name: gateway
matchExpr: "gate*"
- name: direct
- name: direct
matchExpr: "dir*"
# policies define how to create topic and header map by using MQTT publish topic.
policies:
- name: direct
topicIndex: 1
Expand All @@ -93,21 +162,16 @@ topicMapper:
1: gatewayID
2: device
3: status
```
In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to Kafka topic with headers. For example:
```
MQTT multi-level topics:
- beijing/car/123/log
- shanghai/tv/234/status
- nanjing/phone/456/error
with corresponding pattern:
- loc/device/ID/event
with Topic mapper, may produce Kafka topic:
- topic: iot_device, headers: {loc: beijing, device: car, ID: 123, event: log}
- topic: iot_device, headers: {loc: shanghai, device: tv, ID: 234, event: status}
- topic: iot_device, headers: {loc: nanjing, device: phone, ID: 456, event: error}
- name: publish-kafka-backend
kind: Kafka
backend: ["my-cluster-kafka-bootstrap.kafka:9092"]
topic:
default: kafka-topic
mqtt:
# since we don't use original topic name in MQTT Publish packet.
# we need keys to get topic and header map from MQTT Context.
topicKey: kafka-topic
headerKey: kafka-headers
```

## Match different topic mapping policy
Expand Down Expand Up @@ -156,7 +220,7 @@ pattern1: gateway/gatewayID/device/status -> match policy gateway
pattern2: direct/device/status -> match policy direct
example1: "gateway/gate123/iphone/log"
Kafka
topic and header map:
topic: iot_phone
headers:
gateway: gateway
Expand All @@ -165,15 +229,15 @@ Kafka
status: log
example2: "direct/xiaomi/status"
Kafka
topic and header map:
topic: iot_phone
headers:
direct: direct
device: xiaomi
status: status
example3: "direct/tv/log"
Kafka
topic and header map:
topic: iot_other
headers:
direct: direct
Expand Down
6 changes: 2 additions & 4 deletions pkg/object/mqttproxy/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func TestSession(t *testing.T) {
func TestSpec(t *testing.T) {
yamlStr := `
port: 1883
backendType: Kafka
auth:
- userName: test
passBase64: dGVzdA==
Expand All @@ -541,9 +540,8 @@ func TestSpec(t *testing.T) {
}

want := Spec{
Port: 1883,
BackendType: "Kafka",
UseTLS: true,
Port: 1883,
UseTLS: true,
Certificate: []Certificate{
{"cert1", "balabala", "keyForbalabala"},
{"cert2", "foo", "bar"},
Expand Down
1 change: 0 additions & 1 deletion pkg/object/mqttproxy/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type (
EGName string `yaml:"-"`
Name string `yaml:"-"`
Port uint16 `yaml:"port" jsonschema:"required"`
BackendType string `yaml:"backendType" jsonschema:"required"`
UseTLS bool `yaml:"useTLS" jsonschema:"omitempty"`
Certificate []Certificate `yaml:"certificate" jsonschema:"omitempty"`
TopicCacheSize int `yaml:"topicCacheSize" jsonschema:"omitempty"`
Expand Down

0 comments on commit 0ad16f5

Please sign in to comment.