- MQTT is a standard messaging protocol for IoT (Internet of Things) which is extremely lightweight and used by a wide variety of industries.
- By supporting MQTT Proxy in Easegress, MQTT clients can produce messages to Kafka backend directly.
- We also provide the HTTP endpoint to allow the backend to send messages to MQTT clients.
MQTTProxy
is now aBusinessController
to Easegress.- Use
github.com/eclipse/paho.mqtt.golang/packets
to parse MQTT packet.paho.mqtt.golang
is a MQTT 3.1.1 go client introduced by Eclipse Foundation (who also introduced the most widely used MQTT broker mosquitto). - As a MQTT proxy, we now support MQTT clients to
publish
messages to backend Kafka with a powerful topic mapper to map multi-level MQTT topics to Kafka topics with headers (Details in following). - We also support MQTT clients to
subscribe
topics (wildcard is supported) and send messages back to the MQTT clients through the HTTP endpoint.
publish msg topic mapper
MQTT client ------------> Easegress MQTTProxy ------------> Kafka
all published msg will go to Kafka, will not send to other MQTT clients.
subscribe msg
MQTT client <---------------- Easegress MQTT HTTP Endpoint <---- Backend
all msg send back to MQTT clients come from HTTP endpoint.
- We assume that IoT devices (use MQTT client) report their status to the backend (through Kafka), and backend process these messages and send instructions back to IoT devices.
Save following yaml to file mqttproxy.yaml
and then run
egctl object create -f mqttproxy.yaml
kind: MQTTProxy
name: mqttproxy
port: 1883 # tcp port for mqtt clients to connect
backendType: Kafka
kafkaBroker:
backend: ["123.123.123.123:9092", "234.234.234.234:9092"]
useTLS: true
certificate:
- name: cert1
cert: balabala
key: keyForbalabala
- name: cert2
cert: foo
key: bar
auth:
# username and password for mqtt clients to connect broker (from MQTT protocol)
- userName: test
passBase64: dGVzdA==
- userName: admin
passBase64: YWRtaW4=
topicMapper:
# map MQTT multi-level topic to Kafka topic with headers
# detail described in following
# if topicMapper is None, no map will happen and Kafka topic will be the MQTT topic
matchIndex: 0
route:
- name: gateway
matchExpr: "gate*"
- name: direct
matchExpr: "dir*"
policies:
- name: direct
topicIndex: 1
route:
- topic: iot_phone
exprs: ["iphone", "xiaomi", "oppo", "pixel"]
- topic: iot_other
exprs: [".*"]
headers:
0: direct
1: device
2: status
- name: gateway
topicIndex: 3
route:
- topic: iot_phone
exprs: ["iphone", "xiaomi", "oppo", "pixel"]
- topic: iot_other
exprs: [".*"]
headers:
0: gateway
1: gatewayID
2: device
3: status
In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to Kafka topic with headers. For example:
MQTT multi-level topics:
- beijing/car/123/log
- shanghai/tv/234/status
- nanjing/phone/456/error
with corresponding pattern:
- loc/device/ID/event
with Topic mapper, may produce Kafka topic:
- topic: iot_device, headers: {loc: beijing, device: car, ID: 123, event: log}
- topic: iot_device, headers: {loc: shanghai, device: tv, ID: 234, event: status}
- topic: iot_device, headers: {loc: nanjing, device: phone, ID: 456, event: error}
Consider there may be multiple schemas for your MQTT topic, so we first provide a router to route your MQTT topic to different mapping policies and then do the map in that policy.
For example,
schema1: gateway/gatewayID/device/status
schema2: direct/device/status
...
# matchIndex is the MQTT topic level used to match the route policy
matchIndex: 0
route:
- name: gateway
matchExpr: "gate*"
- name: direct
matchExpr: "dir*"
...
means that we use MQTT topic level 0 to match matchExpr
to find a corresponding policy. In this case, gateway/gate123/iphone/log
will match policy gateway
, direct/iphone/log
will match policy direct
.
policies:
- name: direct
topicIndex: 1
route:
- topic: iot_phone
exprs: ["iphone", "xiaomi", "oppo", "pixel"]
- topic: iot_other
exprs: [".*"]
headers:
0: direct
1: device
2: status
topicIndex
is the MQTT topic level used to produce Kafka topic (Regular expressions supported), in this case, direct/iphone/...
will produce Kafka topic iot_phone
, but direct/car/...
will produce Kafka topic iot_other
. headers
used to produce Kafka headers.
More example about topic mapper:
use yaml above:
MQTT topic:
pattern1: gateway/gatewayID/device/status -> match policy gateway
pattern2: direct/device/status -> match policy direct
example1: "gateway/gate123/iphone/log"
Kafka
topic: iot_phone
headers:
gateway: gateway
gatewayID: gate123
device: iphone
status: log
example2: "direct/xiaomi/status"
Kafka
topic: iot_phone
headers:
direct: direct
device: xiaomi
status: status
example3: "direct/tv/log"
Kafka
topic: iot_other
headers:
direct: direct
device: tv
status: log
Empty topicMapper
means there is no map between the MQTT topic and Kafka topic.
We support the backend to send messages back to MQTT clients through the HTTP endpoint.
API for http endpoint:
- Host: Easegress IP, for example
http:https://127.0.0.1
- Port: Easegress API address, by default,
:2381
- Path:
apis/v1/mqttproxy/{name}/topics/publish
, where name is the name of MQTT proxy - Method: POST
- Body:
{
"topic": "yourTopicName",
"qos": 1,
"payload": "dataPayload",
"base64": false
}
Note: Currently, the QoS only support
0
and1
To send binary data, you can encode your binary data base64 and send base64
flag to true
. Your client will receive the original binary data, we will do the decode.
- Status code:
- 200: Success
- 400: StatusBadRequest, may wrong http method, or wrong data (qos send to illegal number) etc.
The HTTP endpoint schema also works for multi-node deployment. Say you have 3 Easegress instances called eg-0
, eg-1
, eg-2
, and your MQTT client connects to eg-0
, if you send messages to eg-1
, your client will receive the message too.
We also support wildcard subscriptions. For example,
POST http:https://127.0.0.1:2381/apis/v1/mqttproxy/mqttproxy/topics/publish
{
"topic": "Beijing/Phone/Update",
"qos": 1, // currently only support 0 and 1
"payload": "time to update",
"base64": false
}
the clients subscribe following topics will receive the message:
"Beijing/Phone/Update"
"Beijing/+/Update"
"Beijing/Phone/+"
"+/Phone/Update"
"Beijing/+/+"
"Beijing/#"
"+/+/+"