Skip to content

Latest commit

 

History

History
234 lines (206 loc) · 7.34 KB

mqtt_proxy.md

File metadata and controls

234 lines (206 loc) · 7.34 KB

MQTT Proxy

Background

  • MQTT is a standard messaging protocol for IoT (Internet of Things) which is extremely lightweight and used by a wide variety of industries.
  • By supporting MQTT Proxy in Easegress, MQTT clients can produce messages to Kafka backend directly.
  • We also provide the HTTP endpoint to allow the backend to send messages to MQTT clients.

Design

  • MQTTProxy is now a BusinessController to Easegress.
  • Use github.com/eclipse/paho.mqtt.golang/packets to parse MQTT packet. paho.mqtt.golang is a MQTT 3.1.1 go client introduced by Eclipse Foundation (who also introduced the most widely used MQTT broker mosquitto).
  • As a MQTT proxy, we now support MQTT clients to publish messages to backend Kafka with a powerful topic mapper to map multi-level MQTT topics to Kafka topics with headers (Details in following).
  • We also support MQTT clients to subscribe topics (wildcard is supported) and send messages back to the MQTT clients through the HTTP endpoint.
             publish msg                       topic mapper
MQTT client ------------> Easegress MQTTProxy ------------> Kafka

all published msg will go to Kafka, will not send to other MQTT clients. 
             
             subscribe msg                                 
MQTT client <---------------- Easegress MQTT HTTP Endpoint <---- Backend

all msg send back to MQTT clients come from HTTP endpoint. 
  • We assume that IoT devices (use MQTT client) report their status to the backend (through Kafka), and backend process these messages and send instructions back to IoT devices.

Example

Save following yaml to file mqttproxy.yaml and then run

egctl object create -f mqttproxy.yaml
kind: MQTTProxy
name: mqttproxy
port: 1883  # tcp port for mqtt clients to connect 
backendType: Kafka
kafkaBroker:
  backend: ["123.123.123.123:9092", "234.234.234.234:9092"]
useTLS: true
certificate:
  - name: cert1
    cert: balabala
    key: keyForbalabala
  - name: cert2
    cert: foo
    key: bar
auth:
  # username and password for mqtt clients to connect broker (from MQTT protocol) 
  - userName: test
    passBase64: dGVzdA==
  - userName: admin
    passBase64: YWRtaW4=
topicMapper:
  # map MQTT multi-level topic to Kafka topic with headers
  # detail described in following 
  # if topicMapper is None, no map will happen and Kafka topic will be the MQTT topic 
  matchIndex: 0
  route: 
    - name: gateway
      matchExpr: "gate*"
    - name: direct
      matchExpr: "dir*"
  policies:
    - name: direct
      topicIndex: 1
      route: 
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]  
      headers: 
      0: direct
      1: device
      2: status
    - name: gateway
      topicIndex: 3
      route: 
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]  
      headers: 
      0: gateway
      1: gatewayID
      2: device
      3: status

In MQTT, there are multi-levels in a topic. Topic mapping is used to map MQTT topic to Kafka topic with headers. For example:

MQTT multi-level topics:
- beijing/car/123/log
- shanghai/tv/234/status
- nanjing/phone/456/error

with corresponding pattern: 
- loc/device/ID/event

with Topic mapper, may produce Kafka topic: 
- topic: iot_device, headers: {loc: beijing, device: car, ID: 123, event: log} 
- topic: iot_device, headers: {loc: shanghai, device: tv, ID: 234, event: status} 
- topic: iot_device, headers: {loc: nanjing, device: phone, ID: 456, event: error}

Match different topic mapping policy

Consider there may be multiple schemas for your MQTT topic, so we first provide a router to route your MQTT topic to different mapping policies and then do the map in that policy.

For example,

schema1: gateway/gatewayID/device/status
schema2: direct/device/status 

...
    # matchIndex is the MQTT topic level used to match the route policy
    matchIndex: 0 
    route: 
    - name: gateway
        matchExpr: "gate*"
    - name: direct
        matchExpr: "dir*"
...

means that we use MQTT topic level 0 to match matchExpr to find a corresponding policy. In this case, gateway/gate123/iphone/log will match policy gateway, direct/iphone/log will match policy direct.

Detail of single policy

  policies:
    - name: direct
      topicIndex: 1
      route: 
      - topic: iot_phone
        exprs: ["iphone", "xiaomi", "oppo", "pixel"]
      - topic: iot_other
        exprs: [".*"]  
      headers: 
      0: direct
      1: device
      2: status

topicIndex is the MQTT topic level used to produce Kafka topic (Regular expressions supported), in this case, direct/iphone/... will produce Kafka topic iot_phone, but direct/car/... will produce Kafka topic iot_other. headers used to produce Kafka headers.

More example about topic mapper:

use yaml above:
MQTT topic: 
pattern1: gateway/gatewayID/device/status -> match policy gateway
pattern2: direct/device/status -> match policy direct

example1: "gateway/gate123/iphone/log"
Kafka 
    topic: iot_phone
    headers: 
        gateway: gateway
        gatewayID: gate123
        device: iphone
        status: log

example2: "direct/xiaomi/status"
Kafka 
    topic: iot_phone
    headers: 
        direct: direct
        device: xiaomi
        status: status

example3: "direct/tv/log"
Kafka 
    topic: iot_other
    headers: 
        direct: direct
        device: tv
        status: log

Empty topicMapper means there is no map between the MQTT topic and Kafka topic.

HTTP endpoint

We support the backend to send messages back to MQTT clients through the HTTP endpoint.

API for http endpoint:

  • Host: Easegress IP, for example http:https://127.0.0.1
  • Port: Easegress API address, by default, :2381
  • Path: apis/v1/mqttproxy/{name}/topics/publish, where name is the name of MQTT proxy
  • Method: POST
  • Body:
{
  "topic": "yourTopicName", 
  "qos": 1,
  "payload": "dataPayload",
  "base64": false
}

Note: Currently, the QoS only support 0 and 1

To send binary data, you can encode your binary data base64 and send base64 flag to true. Your client will receive the original binary data, we will do the decode.

  • Status code:
    • 200: Success
    • 400: StatusBadRequest, may wrong http method, or wrong data (qos send to illegal number) etc.

The HTTP endpoint schema also works for multi-node deployment. Say you have 3 Easegress instances called eg-0, eg-1, eg-2, and your MQTT client connects to eg-0, if you send messages to eg-1, your client will receive the message too.

We also support wildcard subscriptions. For example,

POST http:https://127.0.0.1:2381/apis/v1/mqttproxy/mqttproxy/topics/publish
{
  "topic": "Beijing/Phone/Update", 
  "qos": 1, // currently only support 0 and 1
  "payload": "time to update",
  "base64": false
}

the clients subscribe following topics will receive the message:
"Beijing/Phone/Update"
"Beijing/+/Update"
"Beijing/Phone/+"
"+/Phone/Update"
"Beijing/+/+"
"Beijing/#"
"+/+/+"

References

  1. https://github.com/eclipse/paho.mqtt.golang
  2. http:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html