Skip to content

Commit

Permalink
Add manual partitioning to Kakfa output component (redpanda-data#857)
Browse files Browse the repository at this point in the history
  • Loading branch information
cscorley authored Aug 28, 2021
1 parent 31c3ced commit fec8251
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 9 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ docker run --rm \
benthos -c /config.yaml
```

There are a [few examples here][compose-examples] that show you some ways of setting up Benthos containers using `docker-compose`.

### ZMQ4 Support

Benthos supports ZMQ4 for both data input and output. To add this you need to install libzmq4 and use the compile time flag when building Benthos:
Expand All @@ -181,7 +179,6 @@ Contributions are welcome, please [read the guidelines](CONTRIBUTING.md), come a
[tracers]: https://www.benthos.dev/docs/components/tracers/about
[metrics-config]: config/metrics
[config-interp]: https://www.benthos.dev/docs/configuration/interpolation
[compose-examples]: resources/docker/compose_examples
[streams-api]: https://www.benthos.dev/docs/guides/streams_mode/streams_api
[streams-mode]: https://www.benthos.dev/docs/guides/streams_mode/about
[general-docs]: https://www.benthos.dev/docs/about
Expand Down
1 change: 1 addition & 0 deletions config/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ output:
client_id: benthos_kafka_output
key: ""
partitioner: fnv1a_hash
partition: ""
compression: none
static_headers: {}
metadata:
Expand Down
3 changes: 2 additions & 1 deletion lib/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ However, this also means that manual intervention will eventually be required in
docs.FieldCommon("topic", "The topic to publish messages to.").IsInterpolated(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldCommon("key", "The key to publish messages with.").IsInterpolated(),
docs.FieldCommon("partitioner", "The partitioning algorithm to use.").HasOptions("fnv1a_hash", "murmur2_hash", "random", "round_robin"),
docs.FieldCommon("partitioner", "The partitioning algorithm to use.").HasOptions("fnv1a_hash", "murmur2_hash", "random", "round_robin", "manual"),
docs.FieldCommon("partition", "The manually-specified partition to publish messages to. Must be able to parse as a 32-bit integer.").IsInterpolated(),
docs.FieldCommon("compression", "The compression algorithm to use.").HasOptions("none", "snappy", "lz4", "gzip"),
docs.FieldString("static_headers", "An optional map of static headers that should be added to messages in addition to metadata.", map[string]string{"first-static-header": "value-1", "second-static-header": "value-2"}).Map(),
docs.FieldCommon("metadata", "Specify criteria for which metadata values are sent with messages as headers.").WithChildren(output.MetadataFields()...),
Expand Down
50 changes: 46 additions & 4 deletions lib/output/writer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -32,6 +33,7 @@ type KafkaConfig struct {
ClientID string `json:"client_id" yaml:"client_id"`
Key string `json:"key" yaml:"key"`
Partitioner string `json:"partitioner" yaml:"partitioner"`
Partition string `json:"partition" yaml:"partition"`
Topic string `json:"topic" yaml:"topic"`
Compression string `json:"compression" yaml:"compression"`
MaxMsgBytes int `json:"max_msg_bytes" yaml:"max_msg_bytes"`
Expand Down Expand Up @@ -65,6 +67,7 @@ func NewKafkaConfig() KafkaConfig {
Key: "",
RoundRobinPartitions: false,
Partitioner: "fnv1a_hash",
Partition: "",
Topic: "benthos_stream",
Compression: "none",
MaxMsgBytes: 1000000,
Expand Down Expand Up @@ -99,8 +102,9 @@ type Kafka struct {
version sarama.KafkaVersion
conf KafkaConfig

key *field.Expression
topic *field.Expression
key *field.Expression
topic *field.Expression
partition *field.Expression

producer sarama.SyncProducer
compression sarama.CompressionCodec
Expand All @@ -124,6 +128,13 @@ func NewKafka(conf KafkaConfig, mgr types.Manager, log log.Modular, stats metric
conf.Partitioner = "round_robin"
log.Warnln("The field 'round_robin_partitions' is deprecated, please use the 'partitioner' field (set to 'round_robin') instead.")
}

if conf.Partition == "" && conf.Partitioner == "manual" {
return nil, fmt.Errorf("partition field required for 'manual' partitioner")
} else if len(conf.Partition) > 0 && conf.Partitioner != "manual" {
return nil, fmt.Errorf("partition field can only be specified for 'manual' partitioner")
}

partitioner, err := strToPartitioner(conf.Partitioner)
if err != nil {
return nil, err
Expand All @@ -150,6 +161,9 @@ func NewKafka(conf KafkaConfig, mgr types.Manager, log log.Modular, stats metric
if k.topic, err = bloblang.NewField(conf.Topic); err != nil {
return nil, fmt.Errorf("failed to parse topic expression: %v", err)
}
if k.partition, err = bloblang.NewField(conf.Partition); err != nil {
return nil, fmt.Errorf("failed to parse parition expression: %v", err)
}
if k.backoffCtor, err = conf.Config.GetCtor(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,6 +228,8 @@ func strToPartitioner(str string) (sarama.PartitionerConstructor, error) {
return sarama.NewRandomPartitioner, nil
case "round_robin":
return sarama.NewRoundRobinPartitioner, nil
case "manual":
return sarama.NewManualPartitioner, nil
default:
}
return nil, fmt.Errorf("partitioner not recognised: %v", str)
Expand Down Expand Up @@ -329,7 +345,8 @@ func (k *Kafka) WriteWithContext(ctx context.Context, msg types.Message) error {

userDefinedHeaders := k.buildUserDefinedHeaders(k.staticHeaders)
msgs := []*sarama.ProducerMessage{}
msg.Iter(func(i int, p types.Part) error {

err := msg.Iter(func(i int, p types.Part) error {
key := k.key.Bytes(i, msg)
nextMsg := &sarama.ProducerMessage{
Topic: k.topic.String(i, msg),
Expand All @@ -340,11 +357,36 @@ func (k *Kafka) WriteWithContext(ctx context.Context, msg types.Message) error {
if len(key) > 0 {
nextMsg.Key = sarama.ByteEncoder(key)
}

// Only parse and set the partition if we are configured for manual
// partitioner. Although samara will (currently) ignore the partition
// field when not using a manual partitioner, we should only set it when
// we explicitly want that.
if k.conf.Partitioner == "manual" {
partitionString := k.partition.String(i, msg)
if partitionString == "" {
return fmt.Errorf("partition expression failed to produce a value")
}

partitionInt, err := strconv.Atoi(partitionString)
if err != nil {
return fmt.Errorf("failed to parse valid integer from partition expression: %w", err)
}
if partitionInt < 0 {
return fmt.Errorf("invalid partition parsed from expression, must be >= 0, got %v", partitionInt)
}
// samara requires a 32-bit integer for the partition field
nextMsg.Partition = int32(partitionInt)
}
msgs = append(msgs, nextMsg)
return nil
})

err := producer.SendMessages(msgs)
if err != nil {
return err
}

err = producer.SendMessages(msgs)
for err != nil {
if pErrs, ok := err.(sarama.ProducerErrors); !k.conf.RetryAsBatch && ok {
if len(pErrs) == 0 {
Expand Down
35 changes: 35 additions & 0 deletions lib/output/writer/tests/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tests

import (
"testing"

"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/output/writer"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/stretchr/testify/require"

_ "github.com/Jeffail/benthos/v3/public/components/all"
)

func TestKafkaConfigurationManualPartitioner(t *testing.T) {
conf := writer.NewKafkaConfig()
require.NoError(t, createKafkaWriter(conf), "Expected no error with default configuration")

conf.Partitioner = "manual"
require.Error(t, createKafkaWriter(conf), "Expected error with manual partitioner set and partition unset")

conf.Partition = "test"
require.NoError(t, createKafkaWriter(conf), "Expected no error with manual partitioner set and partition set")

conf.Partitioner = "random"
require.Error(t, createKafkaWriter(conf), "Expected error with non-manual partitioner set and partition set")

conf.Partition = ""
require.NoError(t, createKafkaWriter(conf), "Expected no error with non-manual partitioner set and partition unset")
}

func createKafkaWriter(conf writer.KafkaConfig) error {
_, err := writer.NewKafka(conf, types.NoopMgr(), log.Noop(), metrics.Noop())
return err
}
40 changes: 40 additions & 0 deletions lib/test/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,46 @@ input:
testOptVarThree("false"),
)
})

templateManualPartitioner := `
output:
kafka:
addresses: [ localhost:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
retry_as_batch: $VAR3
metadata:
exclude_prefixes: [ $OUTPUT_META_EXCLUDE_PREFIX ]
batching:
count: $OUTPUT_BATCH_COUNT
partitioner: manual
partition: '${! random_int() % 4 }'
input:
kafka:
addresses: [ localhost:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
checkpoint_limit: $VAR2
start_from_oldest: true
batching:
count: $INPUT_BATCH_COUNT
`

t.Run("manual_partitioner", func(t *testing.T) {
t.Parallel()
suite.Run(
t, templateManualPartitioner,
testOptPreTest(func(t testing.TB, env *testEnvironment) {
env.configVars.var4 = "group" + env.configVars.id
require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, env.configVars.id, 4))
}),
testOptPort(kafkaPortStr),
testOptVarTwo("1"),
testOptVarThree("false"),
)
})

})

func createKafkaTopic(address, id string, partitions int32) error {
Expand Down
13 changes: 12 additions & 1 deletion website/docs/components/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ output:
client_id: benthos_kafka_output
key: ""
partitioner: fnv1a_hash
partition: ""
compression: none
static_headers: {}
metadata:
Expand Down Expand Up @@ -77,6 +78,7 @@ output:
client_id: benthos_kafka_output
key: ""
partitioner: fnv1a_hash
partition: ""
compression: none
static_headers: {}
metadata:
Expand Down Expand Up @@ -375,7 +377,16 @@ The partitioning algorithm to use.

Type: `string`
Default: `"fnv1a_hash"`
Options: `fnv1a_hash`, `murmur2_hash`, `random`, `round_robin`.
Options: `fnv1a_hash`, `murmur2_hash`, `random`, `round_robin`, `manual`.

### `partition`

The manually-specified partition to publish messages to. Must be able to parse as a 32-bit integer.
This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries).


Type: `string`
Default: `""`

### `compression`

Expand Down

0 comments on commit fec8251

Please sign in to comment.