Skip to content

Commit

Permalink
Initial implementation of a Cassandra output. (redpanda-data#543)
Browse files Browse the repository at this point in the history
* Initial Cassandra implementation

* Generated documentation

* Added documentation for fields

Co-authored-by: codingconcepts <[email protected]>
  • Loading branch information
codingconcepts and codingconcepts committed Nov 7, 2020
1 parent 523a32d commit 0234400
Show file tree
Hide file tree
Showing 8 changed files with 620 additions and 0 deletions.
60 changes: 60 additions & 0 deletions config/cassandra.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This file was auto generated by benthos_config_gen.
http:
address: 0.0.0.0:4195
enabled: true
read_timeout: 5s
root_path: /benthos
debug_endpoints: false
input:
type: stdin
stdin:
delimiter: ""
max_buffer: 1000000
multipart: false
buffer:
type: none
none: {}
pipeline:
processors: []
threads: 1
output:
type: cassandra
cassandra:
async: true
backoff:
initial_interval: 1s
max_elapsed_time: 30s
max_interval: 5s
consistency: QUORUM
keyspace: benthos
max_retries: 3
nodes:
- localhost:9042
password_authenticator:
enabled: true
password: cassandra
username: cassandra
table: benthos
resources:
caches: {}
conditions: {}
inputs: {}
outputs: {}
processors: {}
rate_limits: {}
logger:
prefix: benthos
level: INFO
add_timestamp: true
json_format: true
static_fields:
'@service': benthos
metrics:
type: http_server
http_server:
path_mapping: ""
prefix: benthos
tracer:
type: none
none: {}
shutdown_timeout: 20s
12 changes: 12 additions & 0 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,18 @@ OUTPUT_CACHE_KEY = ${!count("items")}-${!ti
OUTPUT_CACHE_MAX_IN_FLIGHT = 1
OUTPUT_CACHE_TARGET
OUTPUT_CACHE_TTL
OUTPUT_CASSANDRA_ASYNC = true
OUTPUT_CASSANDRA_BACKOFF_INITIAL_INTERVAL = 1s
OUTPUT_CASSANDRA_BACKOFF_MAX_ELAPSED_TIME = 30s
OUTPUT_CASSANDRA_BACKOFF_MAX_INTERVAL = 5s
OUTPUT_CASSANDRA_CONSISTENCY = QUORUM
OUTPUT_CASSANDRA_KEYSPACE = benthos
OUTPUT_CASSANDRA_MAX_RETRIES = 3
OUTPUT_CASSANDRA_NODES = localhost:9042
OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_ENABLED = true
OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_PASSWORD = cassandra
OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_USERNAME = cassandra
OUTPUT_CASSANDRA_TABLE = benthos
OUTPUT_DYNAMIC_MAX_IN_FLIGHT = 1
OUTPUT_DYNAMIC_PREFIX
OUTPUT_DYNAMIC_TIMEOUT = 5s
Expand Down
16 changes: 16 additions & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,22 @@ output:
max_in_flight: ${OUTPUT_CACHE_MAX_IN_FLIGHT:1}
target: ${OUTPUT_CACHE_TARGET}
ttl: ${OUTPUT_CACHE_TTL}
cassandra:
async: ${OUTPUT_CASSANDRA_ASYNC:true}
backoff:
initial_interval: ${OUTPUT_CASSANDRA_BACKOFF_INITIAL_INTERVAL:1s}
max_elapsed_time: ${OUTPUT_CASSANDRA_BACKOFF_MAX_ELAPSED_TIME:30s}
max_interval: ${OUTPUT_CASSANDRA_BACKOFF_MAX_INTERVAL:5s}
consistency: ${OUTPUT_CASSANDRA_CONSISTENCY:QUORUM}
keyspace: ${OUTPUT_CASSANDRA_KEYSPACE:benthos}
max_retries: ${OUTPUT_CASSANDRA_MAX_RETRIES:3}
nodes:
- ${OUTPUT_CASSANDRA_NODES:localhost:9042}
password_authenticator:
enabled: ${OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_ENABLED:true}
password: ${OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_PASSWORD:cassandra}
username: ${OUTPUT_CASSANDRA_PASSWORD_AUTHENTICATOR_USERNAME:cassandra}
table: ${OUTPUT_CASSANDRA_TABLE:benthos}
dynamic:
max_in_flight: ${OUTPUT_DYNAMIC_MAX_IN_FLIGHT:1}
prefix: ${OUTPUT_DYNAMIC_PREFIX}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/fatih/color v1.10.0
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20201024154641-5913df4d474e
github.com/gofrs/uuid v3.3.0+incompatible
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.2 // indirect
Expand Down
72 changes: 72 additions & 0 deletions lib/output/cassandra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package output

import (
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/output/writer"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/benthos/v3/lib/util/retries"
)

//------------------------------------------------------------------------------

func init() {
Constructors[TypeCassandra] = TypeSpec{
constructor: NewCassandra,
Summary: `
Send messages to a Cassandra database.`,
Description: `
This output will send messages to a Cassandra database using the INSERT JSON functionality
provided by the database (https://cassandra.apache.org/doc/latest/cql/json.html#insert-json).`,
FieldSpecs: docs.FieldSpecs{
docs.FieldAdvanced(
"nodes",
"A list of Cassandra nodes to connect to.",
[]string{"localhost:9042"}),
docs.FieldAdvanced(
"password_authenticator",
"An object containing the username and password.",
writer.PasswordAuthenticator{Enabled: true, Username: "cassandra", Password: "cassandra"}),
docs.FieldAdvanced(
"keyspace",
"The name of the Cassandra keyspace to use.",
"benthos"),
docs.FieldAdvanced(
"table",
"The name of the Cassandra table to use.",
"benthos"),
docs.FieldAdvanced(
"consistency",
"The consistency level to use.",
"ANY", "ONE", "TWO", "THREE", "QUORUM", "ALL", "LOCAL_QUORUM", "EACH_QUORUM", "LOCAL_ONE"),
docs.FieldAdvanced(
"async",
"A flag to determine whether inserts will be performed concurrently.",
true, false),
docs.FieldAdvanced(
"backoff",
"The mechanism used to provided retries at increasing intervals.",
retries.Backoff{InitialInterval: "1s", MaxInterval: "5s", MaxElapsedTime: "30s"}),
docs.FieldAdvanced(
"max_retries",
"The maximum number of retries to attempt.",
10),
},
}
}

//------------------------------------------------------------------------------

// NewCassandra creates a new Cassandra output type.
func NewCassandra(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
c, err := writer.NewCassandra(conf.Cassandra, log, stats)
if err != nil {
return nil, err
}
return NewWriter(
TypeCassandra, c, log, stats,
)
}

//------------------------------------------------------------------------------
3 changes: 3 additions & 0 deletions lib/output/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
TypeBlobStorage = "blob_storage"
TypeBroker = "broker"
TypeCache = "cache"
TypeCassandra = "cassandra"
TypeDrop = "drop"
TypeDropOnError = "drop_on_error"
TypeDynamic = "dynamic"
Expand Down Expand Up @@ -125,6 +126,7 @@ type Config struct {
BlobStorage writer.AzureBlobStorageConfig `json:"blob_storage" yaml:"blob_storage"`
Broker BrokerConfig `json:"broker" yaml:"broker"`
Cache writer.CacheConfig `json:"cache" yaml:"cache"`
Cassandra writer.CassandraConfig `json:"cassandra" yaml:"cassandra"`
Drop writer.DropConfig `json:"drop" yaml:"drop"`
DropOnError DropOnErrorConfig `json:"drop_on_error" yaml:"drop_on_error"`
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
Expand Down Expand Up @@ -180,6 +182,7 @@ func NewConfig() Config {
BlobStorage: writer.NewAzureBlobStorageConfig(),
Broker: NewBrokerConfig(),
Cache: writer.NewCacheConfig(),
Cassandra: writer.NewCassandraConfig(),
Drop: writer.NewDropConfig(),
DropOnError: NewDropOnErrorConfig(),
Dynamic: NewDynamicConfig(),
Expand Down
Loading

0 comments on commit 0234400

Please sign in to comment.