Skip to content

Commit

Permalink
Add MQTT configuration for retained, keepalive and last will (redpand…
Browse files Browse the repository at this point in the history
…a-data#850)

* MQTT output can set retained flag.

* Support last will and keepalive on MQTT output.

* Support last will and keepalive on MQTT input.

* fixup! Support last will and keepalive on MQTT output.

* fixup! Support last will and keepalive on MQTT input.

* Address lint warnings on support for MQTT last will.

* Meeting simongottschlag comments on PR redpanda-data#850.
  • Loading branch information
bittrance authored Aug 25, 2021
1 parent 1f95a9d commit 04d6cd0
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 18 deletions.
13 changes: 13 additions & 0 deletions config/mqtt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ input:
client_id: benthos_input
qos: 1
clean_session: true
will:
qos: 0
retained: false
topic: ""
payload: ""
user: ""
password: ""
keepalive: 30
tls:
enabled: false
skip_cert_verify: false
Expand All @@ -38,8 +44,15 @@ output:
topic: benthos_topic
client_id: benthos_output
qos: 1
retained: false
will:
qos: 0
retained: false
topic: ""
payload: ""
user: ""
password: ""
keepalive: 30
tls:
enabled: false
skip_cert_verify: false
Expand Down
2 changes: 2 additions & 0 deletions internal/mqttconf/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package mqtt contains supporting utils shared between MQTT reader and writer.
package mqttconf
44 changes: 44 additions & 0 deletions internal/mqttconf/will.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package mqttconf

import (
"errors"

"github.com/Jeffail/benthos/v3/internal/docs"
)

// Will holds configuration for the last will message that the broker emits,
// should benthos exit abnormally.
type Will struct {
QoS uint8 `json:"qos" yaml:"qos"`
Retained bool `json:"retained" yaml:"retained"`
Topic string `json:"topic" yaml:"topic"`
Payload string `json:"payload" yaml:"payload"`
}

// EmptyWill returns an empty will, meaning last will message should not be registered.
func EmptyWill() Will {
return Will{}
}

// Validate the Will configuration and return nil or error accordingly.
func (w *Will) Validate() error {
if w.Topic == "" {
if w.Payload != "" || w.QoS > 0 || w.Retained {
return errors.New("include topic to register a last will")
}
}

return nil
}

// WillFieldSpec defines a last will message registration.
func WillFieldSpec() docs.FieldSpec {
return docs.FieldAdvanced(
"will", "Set last will message in case of Benthos failure",
).WithChildren(
docs.FieldCommon("qos", "Set QoS for last will message.").HasOptions("0", "1", "2"),
docs.FieldCommon("retained", "Set retained for last will message."),
docs.FieldCommon("topic", "Set topic for last will message."),
docs.FieldCommon("payload", "Set payload for last will message."),
)
}
3 changes: 3 additions & 0 deletions lib/input/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package input

import (
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/mqttconf"
"github.com/Jeffail/benthos/v3/lib/input/reader"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
Expand Down Expand Up @@ -37,8 +38,10 @@ You can access these metadata fields using
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("qos", "The level of delivery guarantee to enforce.").HasOptions("0", "1", "2"),
docs.FieldAdvanced("clean_session", "Set whether the connection is non-persistent."),
mqttconf.WillFieldSpec(),
docs.FieldAdvanced("user", "A username to assume for the connection."),
docs.FieldAdvanced("password", "A password to provide for the connection."),
docs.FieldAdvanced("keepalive", "Max seconds of inactivity before a keepalive message is sent."),
tls.FieldSpec().AtVersion("3.45.0"),
docs.FieldDeprecated("stale_connection_timeout"),
},
Expand Down
34 changes: 25 additions & 9 deletions lib/input/reader/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/Jeffail/benthos/v3/internal/mqttconf"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/message"
"github.com/Jeffail/benthos/v3/lib/metrics"
Expand All @@ -20,15 +21,17 @@ import (

// MQTTConfig contains configuration fields for the MQTT input type.
type MQTTConfig struct {
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Topics []string `json:"topics" yaml:"topics"`
ClientID string `json:"client_id" yaml:"client_id"`
CleanSession bool `json:"clean_session" yaml:"clean_session"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
StaleConnectionTimeout string `json:"stale_connection_timeout" yaml:"stale_connection_timeout"`
TLS tls.Config `json:"tls" yaml:"tls"`
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Topics []string `json:"topics" yaml:"topics"`
ClientID string `json:"client_id" yaml:"client_id"`
Will mqttconf.Will `json:"will" yaml:"will"`
CleanSession bool `json:"clean_session" yaml:"clean_session"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
StaleConnectionTimeout string `json:"stale_connection_timeout" yaml:"stale_connection_timeout"`
KeepAlive int64 `json:"keepalive" yaml:"keepalive"`
TLS tls.Config `json:"tls" yaml:"tls"`
}

// NewMQTTConfig creates a new MQTTConfig with default values.
Expand All @@ -38,10 +41,12 @@ func NewMQTTConfig() MQTTConfig {
QoS: 1,
Topics: []string{"benthos_topic"},
ClientID: "benthos_input",
Will: mqttconf.EmptyWill(),
CleanSession: true,
User: "",
Password: "",
StaleConnectionTimeout: "",
KeepAlive: 30,
TLS: tls.NewConfig(),
}
}
Expand Down Expand Up @@ -77,13 +82,19 @@ func NewMQTT(
log: log,
}

var err error
if len(conf.StaleConnectionTimeout) > 0 {
var err error
if m.staleConnectionTimeout, err = time.ParseDuration(conf.StaleConnectionTimeout); err != nil {
return nil, fmt.Errorf("unable to parse stale connection timeout duration string: %w", err)
}
}

err = m.conf.Will.Validate()
if err != nil {
return nil, err
}

for _, u := range conf.URLs {
for _, splitURL := range strings.Split(u, ",") {
if len(splitURL) > 0 {
Expand Down Expand Up @@ -129,6 +140,7 @@ func (m *MQTT) ConnectWithContext(ctx context.Context) error {
SetAutoReconnect(false).
SetClientID(m.conf.ClientID).
SetCleanSession(m.conf.CleanSession).
SetKeepAlive(time.Duration(m.conf.KeepAlive)).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
client.Disconnect(0)
closeMsgChan()
Expand Down Expand Up @@ -158,6 +170,10 @@ func (m *MQTT) ConnectWithContext(ctx context.Context) error {
}
})

if m.conf.Will.Topic != "" {
conf = conf.SetWill(m.conf.Will.Topic, m.conf.Will.Payload, m.conf.Will.QoS, m.conf.Will.Retained)
}

if m.conf.TLS.Enabled {
tlsConf, err := m.conf.TLS.Get()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions lib/output/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package output

import (
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/mqttconf"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/output/writer"
Expand All @@ -26,8 +27,11 @@ messages these interpolations are performed per message part.`,
docs.FieldCommon("topic", "The topic to publish messages to."),
docs.FieldCommon("client_id", "An identifier for the client."),
docs.FieldCommon("qos", "The QoS value to set for each message.").HasOptions("0", "1", "2"),
docs.FieldBool("retained", "Set message as retained on the topic."),
mqttconf.WillFieldSpec(),
docs.FieldAdvanced("user", "A username to connect with."),
docs.FieldAdvanced("password", "A password to connect with."),
docs.FieldAdvanced("keepalive", "Max seconds of inactivity before a keepalive message is sent."),
tls.FieldSpec().AtVersion("3.45.0"),
docs.FieldCommon("max_in_flight", "The maximum number of messages to have in flight at a given time. Increase this to improve throughput."),
},
Expand Down
34 changes: 25 additions & 9 deletions lib/output/writer/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Jeffail/benthos/v3/internal/bloblang"
"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/mqttconf"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
Expand All @@ -20,14 +21,17 @@ import (

// MQTTConfig contains configuration fields for the MQTT output type.
type MQTTConfig struct {
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Topic string `json:"topic" yaml:"topic"`
ClientID string `json:"client_id" yaml:"client_id"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
TLS tls.Config `json:"tls" yaml:"tls"`
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Retained bool `json:"retained" yaml:"retained"`
Topic string `json:"topic" yaml:"topic"`
ClientID string `json:"client_id" yaml:"client_id"`
Will mqttconf.Will `json:"will" yaml:"will"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
KeepAlive int64 `json:"keepalive" yaml:"keepalive"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
TLS tls.Config `json:"tls" yaml:"tls"`
}

// NewMQTTConfig creates a new MQTTConfig with default values.
Expand All @@ -37,9 +41,11 @@ func NewMQTTConfig() MQTTConfig {
QoS: 1,
Topic: "benthos_topic",
ClientID: "benthos_output",
Will: mqttconf.EmptyWill(),
User: "",
Password: "",
MaxInFlight: 1,
KeepAlive: 30,
TLS: tls.NewConfig(),
}
}
Expand Down Expand Up @@ -76,6 +82,11 @@ func NewMQTT(
return nil, fmt.Errorf("failed to parse topic expression: %v", err)
}

err = m.conf.Will.Validate()
if err != nil {
return nil, err
}

for _, u := range conf.URLs {
for _, splitURL := range strings.Split(u, ",") {
if len(splitURL) > 0 {
Expand Down Expand Up @@ -111,12 +122,17 @@ func (m *MQTT) Connect() error {
}).
SetConnectTimeout(time.Second).
SetWriteTimeout(time.Second).
SetKeepAlive(time.Duration(m.conf.KeepAlive)).
SetClientID(m.conf.ClientID)

for _, u := range m.urls {
conf = conf.AddBroker(u)
}

if m.conf.Will.Topic != "" {
conf = conf.SetWill(m.conf.Will.Topic, m.conf.Will.Payload, m.conf.Will.QoS, m.conf.Will.Retained)
}

if m.conf.TLS.Enabled {
tlsConf, err := m.conf.TLS.Get()
if err != nil {
Expand Down Expand Up @@ -163,7 +179,7 @@ func (m *MQTT) Write(msg types.Message) error {
}

return IterateBatchedSend(msg, func(i int, p types.Part) error {
mtok := client.Publish(m.topic.String(i, msg), m.conf.QoS, false, p.Get())
mtok := client.Publish(m.topic.String(i, msg), m.conf.QoS, m.conf.Retained, p.Get())
mtok.Wait()
sendErr := mtok.Error()
if sendErr == mqtt.ErrNotConnected {
Expand Down
54 changes: 54 additions & 0 deletions website/docs/components/inputs/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ input:
client_id: benthos_input
qos: 1
clean_session: true
will:
qos: 0
retained: false
topic: ""
payload: ""
user: ""
password: ""
keepalive: 30
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -125,6 +131,46 @@ Set whether the connection is non-persistent.
Type: `bool`
Default: `true`

### `will`

Set last will message in case of Benthos failure


Type: `object`

### `will.qos`

Set QoS for last will message.


Type: `int`
Default: `0`
Options: `0`, `1`, `2`.

### `will.retained`

Set retained for last will message.


Type: `bool`
Default: `false`

### `will.topic`

Set topic for last will message.


Type: `string`
Default: `""`

### `will.payload`

Set payload for last will message.


Type: `string`
Default: `""`

### `user`

A username to assume for the connection.
Expand All @@ -141,6 +187,14 @@ A password to provide for the connection.
Type: `string`
Default: `""`

### `keepalive`

Max seconds of inactivity before a keepalive message is sent.


Type: `int`
Default: `30`

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down
Loading

0 comments on commit 04d6cd0

Please sign in to comment.