Skip to content

Commit

Permalink
Fix cassandra type conversions for args_mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Sep 6, 2021
1 parent a485927 commit 864c2c0
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 98 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- New experimental `gcp_bigquery` output.
- Go API: It's now possible to parse a config spec directly with `ParseYAML`.
- Bloblang methods and functions now support named parameters.
- Field `args_mapping` added to the `cassandra` output.

## 3.54.0 - 2021-09-01

Expand Down
1 change: 0 additions & 1 deletion config/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ output:
password: ""
disable_initial_host_lookup: false
query: ""
args: []
args_mapping: ""
consistency: QUORUM
max_retries: 3
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ require (
github.com/fatih/color v1.10.0
github.com/go-redis/redis/v7 v7.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20201024154641-5913df4d474e
github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de
github.com/gofrs/uuid v3.3.0+incompatible
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.3
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.6
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/V
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gocql/gocql v0.0.0-20201024154641-5913df4d474e h1:p5NB/+xroUR8OnumV9/cbCav+mmSjrGi2uwYtXNFJG4=
github.com/gocql/gocql v0.0.0-20201024154641-5913df4d474e/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de h1:J7nwGjNyfkF+hTmpFPp/nvQrfQ2i19/7yWNb8c4YNnI=
github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84=
Expand Down Expand Up @@ -380,6 +382,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down
157 changes: 100 additions & 57 deletions lib/output/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Jeffail/benthos/v3/internal/bloblang"
"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/bloblang/mapping"
"github.com/Jeffail/benthos/v3/internal/bloblang/query"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/message/batch"
Expand Down Expand Up @@ -54,23 +55,6 @@ When populating timestamp columns the value must either be a string in ISO 8601
Title: "Basic Inserts",
Summary: "If we were to create a table with some basic columns with `CREATE TABLE foo.bar (id int primary key, content text, created_at timestamp);`, and were processing JSON documents of the form `{\"id\":\"342354354\",\"content\":\"hello world\",\"timestamp\":1605219406}`, we could populate our table with the following config:",
Config: `
output:
cassandra:
addresses:
- localhost:9042
query: 'INSERT INTO foo.bar (id, content, created_at) VALUES (?, ?, ?)'
args:
- ${! json("id") }
- ${! json("content") }
- ${! json("timestamp").format_timestamp() }
batching:
count: 500
`,
},
{
Title: "Basic Inserts with bloblang mapping",
Summary: "If we were to create a table with some basic columns with `CREATE TABLE foo.bar (id int primary key, content text, created_at timestamp);`, and were processing JSON documents of the form `{\"id\":\"342354354\",\"content\":\"hello world\",\"timestamp\":1605219406}`, we could populate our table with the following config:",
Config: `
output:
cassandra:
addresses:
Expand All @@ -80,11 +64,12 @@ output:
root = [
this.id,
this.content,
this.timestamp.format_timestamp()
this.timestamp
]
batching:
count: 500
`},
`,
},
{
Title: "Insert JSON Documents",
Summary: "The following example inserts JSON documents into the table `footable` of the keyspace `foospace` using INSERT JSON (https://cassandra.apache.org/doc/latest/cql/json.html#insert-json).",
Expand All @@ -94,8 +79,7 @@ output:
addresses:
- localhost:9042
query: 'INSERT INTO foospace.footable JSON ?'
args:
- ${! content() }
args_mapping: 'root = [ this ]'
batching:
count: 500
`,
Expand Down Expand Up @@ -123,13 +107,13 @@ output:
"If enabled the driver will not attempt to get host info from the system.peers table. This can speed up queries but will mean that data_centre, rack and token information will not be available.",
),
docs.FieldCommon("query", "A query to execute for each message."),
docs.FieldString(
docs.FieldDeprecated(
"args",
"A list of arguments for the query to be resolved for each message.",
).IsInterpolated().Array(),
).IsInterpolated().Array().HasType(docs.FieldTypeString),
docs.FieldBloblang(
"args_mapping",
"A [Bloblang mapping](/docs/guides/bloblang/about) that can be used to provide arguments to Cassandra queries."),
"A [Bloblang mapping](/docs/guides/bloblang/about) that can be used to provide arguments to Cassandra queries. The result of the query must be an array containing a matching number of elements to the query arguments.").AtVersion("3.55.0"),
docs.FieldAdvanced(
"consistency",
"The consistency level to use.",
Expand Down Expand Up @@ -220,11 +204,6 @@ type cassandraWriter struct {
}

func newCassandraWriter(conf CassandraConfig, log log.Modular, stats metrics.Type) (*cassandraWriter, error) {
// Allow only args or args_mapping for now.
if len(conf.Args) > 0 && conf.ArgsMapping != "" {
return nil, fmt.Errorf("can only specify one of [args, args_mapping]")
}

c := cassandraWriter{
log: log,
stats: stats,
Expand Down Expand Up @@ -252,18 +231,22 @@ func newCassandraWriter(conf CassandraConfig, log log.Modular, stats metrics.Typ
}

func (c *cassandraWriter) parseArgs() error {
switch {
// If we've been given "args", parse them.
case len(c.conf.Args) > 0:
// Allow only args or args_mapping for now.
if len(c.conf.Args) > 0 && c.conf.ArgsMapping != "" {
return fmt.Errorf("can only specify one of [args, args_mapping]")
}

if len(c.conf.Args) > 0 {
for i, v := range c.conf.Args {
expr, err := bloblang.NewField(v)
if err != nil {
return fmt.Errorf("failed to parse arg %v expression: %v", i, err)
}
c.args = append(c.args, expr)
}
// If we've been given "args_mapping", parse them.
case c.conf.ArgsMapping != "":
}

if c.conf.ArgsMapping != "" {
var err error
if c.argsMapping, err = bloblang.NewMapping("", c.conf.ArgsMapping); err != nil {
return fmt.Errorf("parsing args_mapping: %w", err)
Expand Down Expand Up @@ -332,8 +315,6 @@ func (c *cassandraWriter) WriteWithContext(ctx context.Context, msg types.Messag
return c.writeBatch(session, msg)
}

type stringValue string

func (c *cassandraWriter) writeRow(session *gocql.Session, msg types.Message) error {
t0 := time.Now()

Expand All @@ -354,14 +335,16 @@ func (c *cassandraWriter) writeBatch(session *gocql.Session, msg types.Message)
batch := session.NewBatch(gocql.UnloggedBatch)
t0 := time.Now()

msg.Iter(func(i int, p types.Part) error {
if err := msg.Iter(func(i int, p types.Part) error {
values, err := c.mapArgs(msg, i)
if err != nil {
return fmt.Errorf("parsing args for part: %d: %w", i, err)
}
batch.Query(c.conf.Query, values...)
return nil
})
}); err != nil {
return err
}

err := session.ExecuteBatch(batch)
if err != nil {
Expand All @@ -372,32 +355,39 @@ func (c *cassandraWriter) writeBatch(session *gocql.Session, msg types.Message)
}

func (c *cassandraWriter) mapArgs(msg types.Message, index int) ([]interface{}, error) {
// If we've been given the "args" field, extract values from there.
if len(c.conf.Args) > 0 {
values := make([]interface{}, 0, len(c.args))
for _, arg := range c.args {
values = append(values, stringValue(arg.String(0, msg)))
if c.argsMapping != nil {
// We've got an "args_mapping" field, extract values from there.
part, err := c.argsMapping.MapPart(index, msg)
if err != nil {
return nil, fmt.Errorf("executing bloblang mapping: %w", err)
}
return values, nil
}

// We've got an "args_mapping" field, extract values from there.
part, err := c.argsMapping.MapPart(index, msg)
if err != nil {
return nil, fmt.Errorf("executing bloblang mapping: %w", err)
}
jraw, err := part.JSON()
if err != nil {
return nil, fmt.Errorf("parsing bloblang mapping result as json: %w", err)
}

jraw, err := part.JSON()
if err != nil {
return nil, fmt.Errorf("parsing bloblang mapping result as json: %w", err)
j, ok := jraw.([]interface{})
if !ok {
return nil, fmt.Errorf("expected bloblang mapping result to be []interface{} but was %T", jraw)
}

for i, v := range j {
j[i] = genericValue{v: v}
}
return j, nil
}

j, ok := jraw.([]interface{})
if !ok {
return nil, fmt.Errorf("expected bloblang mapping result to be []interface{} but was %T", jraw)
// If we've been given the "args" field, extract values from there.
if len(c.args) > 0 {
values := make([]interface{}, 0, len(c.args))
for _, arg := range c.args {
values = append(values, stringValue(arg.String(index, msg)))
}
return values, nil
}

return j, nil
return nil, nil
}

// CloseAsync shuts down the Cassandra output and stops processing messages.
Expand Down Expand Up @@ -470,6 +460,8 @@ func formatCassandraInt32(x int32) []byte {
return []byte{byte(x >> 24), byte(x >> 16), byte(x >> 8), byte(x)}
}

type stringValue string

// All of our argument values are string types due to interpolation. However,
// gocql performs type checking and unfortunately does not like timestamp and
// some other values as strings:
Expand Down Expand Up @@ -519,3 +511,54 @@ func (s stringValue) MarshalCQL(info gocql.TypeInfo) ([]byte, error) {
}
return gocql.Marshal(info, string(s))
}

type genericValue struct {
v interface{}
}

// We get typed values out of mappings. However, gocql performs type checking
// and unfortunately does not like timestamp and some other values as strings:
// https://github.com/gocql/gocql/blob/5913df4d474e0b2492a129d17bbb3c04537a15cd/marshal.go#L1160
// it's also very strict on numerical types, so we need to do some magic here.
func (g genericValue) MarshalCQL(info gocql.TypeInfo) ([]byte, error) {
switch info.Type() {
case gocql.TypeTimestamp:
t, err := query.IGetTimestamp(g.v)
if err != nil {
return nil, err
}
if t.IsZero() {
return []byte{}, nil
}
x := t.UTC().Unix()*1e3 + int64(t.UTC().Nanosecond()/1e6)
return formatCassandraInt64(x), nil
case gocql.TypeTime:
x, err := query.IGetInt(g.v)
if err != nil {
return nil, err
}
return formatCassandraInt64(x), nil
case gocql.TypeBoolean:
b, err := query.IGetBool(g.v)
if err != nil {
return nil, err
}
if b {
return []byte{1}, nil
}
return []byte{0}, nil
case gocql.TypeFloat:
f, err := query.IGetNumber(g.v)
if err != nil {
return nil, err
}
return formatCassandraInt32(int32(math.Float32bits(float32(f)))), nil
case gocql.TypeDouble:
f, err := query.IGetNumber(g.v)
if err != nil {
return nil, err
}
return formatCassandraInt64(int64(math.Float64bits(f))), nil
}
return gocql.Marshal(info, query.IToString(g.v))
}
Loading

0 comments on commit 864c2c0

Please sign in to comment.