Skip to content

Commit

Permalink
Add dynamic_client_id_suffix to mqtt components
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 7, 2021
1 parent aecc5d4 commit b3f1e37
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- Go API: New config field types `StringMap`, `IntList`, and `IntMap`.
- The `http_client` input, output and processor now include the response body in request error logs for more context.
- Field `dynamic_client_id_suffix` added to the `mqtt` input and output.

## 3.56.0 - 2021-09-22

Expand Down
3 changes: 3 additions & 0 deletions lib/input/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ You can access these metadata fields using
docs.FieldCommon("urls", "A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.").Array(),
docs.FieldCommon("topics", "A list of topics to consume from.").Array(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldString("dynamic_client_id_suffix", "Append a dynamically generated suffix to the specified `client_id` on each run of the pipeline. This can be useful when clustering Benthos producers.").Optional().Advanced().HasAnnotatedOptions(
"nanoid", "append a nanoid of length 21 characters",
),
docs.FieldAdvanced("qos", "The level of delivery guarantee to enforce.").HasOptions("0", "1", "2"),
docs.FieldAdvanced("clean_session", "Set whether the connection is non-persistent."),
mqttconf.WillFieldSpec(),
Expand Down
14 changes: 14 additions & 0 deletions lib/input/reader/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/tls"
mqtt "github.com/eclipse/paho.mqtt.golang"
gonanoid "github.com/matoous/go-nanoid/v2"
)

//------------------------------------------------------------------------------
Expand All @@ -25,6 +26,7 @@ type MQTTConfig struct {
QoS uint8 `json:"qos" yaml:"qos"`
Topics []string `json:"topics" yaml:"topics"`
ClientID string `json:"client_id" yaml:"client_id"`
DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
Will mqttconf.Will `json:"will" yaml:"will"`
CleanSession bool `json:"clean_session" yaml:"clean_session"`
User string `json:"user" yaml:"user"`
Expand Down Expand Up @@ -89,6 +91,18 @@ func NewMQTT(
}
}

switch m.conf.DynamicClientIDSuffix {
case "nanoid":
nid, err := gonanoid.New()
if err != nil {
return nil, fmt.Errorf("failed to generate nanoid: %w", err)
}
m.conf.ClientID += nid
case "":
default:
return nil, fmt.Errorf("unknown dynamic_client_id_suffix: %v", m.conf.DynamicClientIDSuffix)
}

if err := m.conf.Will.Validate(); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion lib/output/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ messages these interpolations are performed per message part.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("urls", "A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.", []string{"tcp:https://localhost:1883"}).Array(),
docs.FieldCommon("topic", "The topic to publish messages to."),
docs.FieldCommon("client_id", "An identifier for the client."),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldString("dynamic_client_id_suffix", "Append a dynamically generated suffix to the specified `client_id` on each run of the pipeline. This can be useful when clustering Benthos producers.").Optional().Advanced().HasAnnotatedOptions(
"nanoid", "append a nanoid of length 21 characters",
),
docs.FieldCommon("qos", "The QoS value to set for each message.").HasOptions("0", "1", "2"),
docs.FieldBool("retained", "Set message as retained on the topic."),
mqttconf.WillFieldSpec(),
Expand Down
36 changes: 25 additions & 11 deletions lib/output/writer/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@ import (
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/tls"
mqtt "github.com/eclipse/paho.mqtt.golang"
gonanoid "github.com/matoous/go-nanoid/v2"
)

//------------------------------------------------------------------------------

// MQTTConfig contains configuration fields for the MQTT output type.
type MQTTConfig struct {
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Retained bool `json:"retained" yaml:"retained"`
Topic string `json:"topic" yaml:"topic"`
ClientID string `json:"client_id" yaml:"client_id"`
Will mqttconf.Will `json:"will" yaml:"will"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
KeepAlive int64 `json:"keepalive" yaml:"keepalive"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
TLS tls.Config `json:"tls" yaml:"tls"`
URLs []string `json:"urls" yaml:"urls"`
QoS uint8 `json:"qos" yaml:"qos"`
Retained bool `json:"retained" yaml:"retained"`
Topic string `json:"topic" yaml:"topic"`
ClientID string `json:"client_id" yaml:"client_id"`
DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
Will mqttconf.Will `json:"will" yaml:"will"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
KeepAlive int64 `json:"keepalive" yaml:"keepalive"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
TLS tls.Config `json:"tls" yaml:"tls"`
}

// NewMQTTConfig creates a new MQTTConfig with default values.
Expand Down Expand Up @@ -94,6 +96,18 @@ func NewMQTTV2(
return nil, fmt.Errorf("failed to parse topic expression: %v", err)
}

switch m.conf.DynamicClientIDSuffix {
case "nanoid":
nid, err := gonanoid.New()
if err != nil {
return nil, fmt.Errorf("failed to generate nanoid: %w", err)
}
m.conf.ClientID += nid
case "":
default:
return nil, fmt.Errorf("unknown dynamic_client_id_suffix: %v", m.conf.DynamicClientIDSuffix)
}

if err := m.conf.Will.Validate(); err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions lib/test/integration/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ output:
qos: 1
topic: topic-$ID
client_id: client-output-$ID
dynamic_client_id_suffix: "$VAR1"
max_in_flight: $MAX_IN_FLIGHT
input:
mqtt:
urls: [ tcp:https://localhost:$PORT ]
topics: [ topic-$ID ]
client_id: client-input-$ID
dynamic_client_id_suffix: "$VAR1"
clean_session: false
`
suite := integrationTests(
Expand All @@ -78,4 +80,15 @@ input:
testOptMaxInFlight(10),
)
})
t.Run("with generated suffix", func(t *testing.T) {
t.Parallel()
suite.Run(
t, template,
testOptSleepAfterInput(100*time.Millisecond),
testOptSleepAfterOutput(100*time.Millisecond),
testOptPort(resource.GetPort("1883/tcp")),
testOptMaxInFlight(10),
testOptVarOne("nanoid"),
)
})
})
14 changes: 14 additions & 0 deletions website/docs/components/inputs/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ input:
topics:
- benthos_topic
client_id: benthos_input
dynamic_client_id_suffix: ""
qos: 1
clean_session: true
will:
Expand Down Expand Up @@ -115,6 +116,19 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_input"`

### `dynamic_client_id_suffix`

Append a dynamically generated suffix to the specified `client_id` on each run of the pipeline. This can be useful when clustering Benthos producers.


Type: `string`
Default: `""`

| Option | Summary |
|---|---|
| `nanoid` | append a nanoid of length 21 characters |


### `qos`

The level of delivery guarantee to enforce.
Expand Down
16 changes: 15 additions & 1 deletion website/docs/components/outputs/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ output:
- tcp:https://localhost:1883
topic: benthos_topic
client_id: benthos_output
dynamic_client_id_suffix: ""
qos: 1
retained: false
will:
Expand Down Expand Up @@ -113,12 +114,25 @@ Default: `"benthos_topic"`

### `client_id`

An identifier for the client.
An identifier for the client connection.


Type: `string`
Default: `"benthos_output"`

### `dynamic_client_id_suffix`

Append a dynamically generated suffix to the specified `client_id` on each run of the pipeline. This can be useful when clustering Benthos producers.


Type: `string`
Default: `""`

| Option | Summary |
|---|---|
| `nanoid` | append a nanoid of length 21 characters |


### `qos`

The QoS value to set for each message.
Expand Down

0 comments on commit b3f1e37

Please sign in to comment.