Skip to content

Commit

Permalink
Redis streams input allows for automatic creation of streams (redpand…
Browse files Browse the repository at this point in the history
…a-data#855)

* Redis streams allows for automatic creation of stream (i.e. Redis MKSTREAM option).

Also fixes the error messages where group and stream names were flipped.

* redis_streams auto-create streams by default.
  • Loading branch information
Bittrance committed Aug 28, 2021
1 parent 5c14ca1 commit 31c3ced
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 4 deletions.
1 change: 1 addition & 0 deletions config/redis_streams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ input:
limit: 10
client_id: benthos_consumer
consumer_group: benthos_group
create_streams: true
start_from_oldest: true
commit_period: 1s
timeout: 1s
Expand Down
1 change: 1 addition & 0 deletions lib/input/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ input:
url: tcp:https://localhost:6379
streams:
- benthos_stream
create_streams: false
body_key: body
consumer_group: benthos_group
Expand Down
14 changes: 10 additions & 4 deletions lib/input/reader/redis_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type RedisStreamsConfig struct {
bredis.Config `json:",inline" yaml:",inline"`
BodyKey string `json:"body_key" yaml:"body_key"`
Streams []string `json:"streams" yaml:"streams"`
CreateStreams bool `json:"create_streams" yaml:"create_streams"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
ClientID string `json:"client_id" yaml:"client_id"`
Limit int64 `json:"limit" yaml:"limit"`
Expand All @@ -42,6 +43,7 @@ func NewRedisStreamsConfig() RedisStreamsConfig {
Config: bredis.NewConfig(),
BodyKey: "body",
Streams: []string{"benthos_stream"},
CreateStreams: true,
ConsumerGroup: "benthos_group",
ClientID: "benthos_consumer",
Limit: 10,
Expand Down Expand Up @@ -219,10 +221,14 @@ func (r *RedisStreams) ConnectWithContext(ctx context.Context) error {
if r.conf.StartFromOldest {
offset = "0"
}
if err := client.XGroupCreate(s, r.conf.ConsumerGroup, offset).Err(); err != nil {
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
return fmt.Errorf("failed to create group %v for stream %v: %v", s, r.conf.ConsumerGroup, err)
}
var err error
if r.conf.CreateStreams {
err = client.XGroupCreateMkStream(s, r.conf.ConsumerGroup, offset).Err()
} else {
err = client.XGroupCreate(s, r.conf.ConsumerGroup, offset).Err()
}
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return fmt.Errorf("failed to create group %v for stream %v: %v", r.conf.ConsumerGroup, s, err)
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/input/redis_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ as metadata fields.`,
docs.FieldCommon("limit", "The maximum number of messages to consume from a single request."),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the stream."),
docs.FieldAdvanced("create_streams", "Create subscribed streams if they do not exist (MKSTREAM option)."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a stream, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldAdvanced("commit_period", "The period of time between each commit of the current offset. Offsets are always committed during shutdown."),
docs.FieldAdvanced("timeout", "The length of time to poll for new messages before reattempting."),
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/redis_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ input:
limit: 10
client_id: benthos_consumer
consumer_group: benthos_group
create_streams: true
start_from_oldest: true
commit_period: 1s
timeout: 1s
Expand Down Expand Up @@ -294,6 +295,14 @@ An identifier for the consumer group of the stream.
Type: `string`
Default: `"benthos_group"`

### `create_streams`

Create subscribed streams if they do not exist (MKSTREAM option).


Type: `bool`
Default: `true`

### `start_from_oldest`

If an offset is not found for a stream, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset.
Expand Down

0 comments on commit 31c3ced

Please sign in to comment.