Skip to content

Commit

Permalink
Kafka Franz - add aws msk iam (redpanda-data#1295)
Browse files Browse the repository at this point in the history
* Expose aws session code to outside of package

* Update expected fields for kafka franz input and output

* Update sasl code

* Update docs

* Clean up aws session code

* Leave comments to fix linting error
  • Loading branch information
ekeric13 authored Jun 19, 2022
1 parent 4ec70ec commit 7050410
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internal/impl/aws/cache_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Strong read consistency can be enabled using the ` + "`consistent_read`" + ` con
Field(service.NewBackOffField("retries", false, retriesDefaults).
Advanced())

for _, f := range sessionFields() {
for _, f := range SessionFields() {
spec = spec.Field(f)
}
return spec
Expand Down Expand Up @@ -111,7 +111,7 @@ func newDynamodbCacheFromConfig(conf *service.ParsedConfig) (*dynamodbCache, err
}
ttlKey = &ttlKeyTmp
}
sess, err := getSession(conf)
sess, err := GetSession(conf)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/aws/cache_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func s3CacheConfig() *service.ConfigSpec {
Field(service.NewBackOffField("retries", false, retriesDefaults).
Advanced())

for _, f := range sessionFields() {
for _, f := range SessionFields() {
spec = spec.Field(f)
}
return spec
Expand Down Expand Up @@ -75,7 +75,7 @@ func newS3CacheFromConfig(conf *service.ParsedConfig) (*s3Cache, error) {
return nil, err
}

sess, err := getSession(conf, func(c *aws.Config) {
sess, err := GetSession(conf, func(c *aws.Config) {
c.S3ForcePathStyle = aws.Bool(forcePathStyleURLs)
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/aws/processor_dynamodb_partiql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ pipeline:
`,
)

for _, f := range sessionFields() {
for _, f := range SessionFields() {
config = config.Field(f)
}

err := service.RegisterBatchProcessor(
"aws_dynamodb_partiql", config,
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) {
sess, err := getSession(conf)
sess, err := GetSession(conf)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/aws/processor_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pipeline:
Default("").
Advanced())

for _, f := range sessionFields() {
for _, f := range SessionFields() {
config = config.Field(f)
}

Expand All @@ -97,7 +97,7 @@ pipeline:
err := service.RegisterBatchProcessor(
"aws_lambda", config,
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) {
sess, err := getSession(conf)
sess, err := GetSession(conf)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/impl/aws/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/benthosdev/benthos/v4/public/service"
)

func sessionFields() []*service.ConfigField {
// SessionFields defines a re-usable set of config fields for an AWS session
func SessionFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewStringField("region").
Description("The AWS region to target.").
Expand Down Expand Up @@ -48,7 +49,8 @@ func sessionFields() []*service.ConfigField {
}
}

func getSession(parsedConf *service.ParsedConfig, opts ...func(*aws.Config)) (*session.Session, error) {
// GetSession attempts to create an AWS session based on the parsedConfig
func GetSession(parsedConf *service.ParsedConfig, opts ...func(*aws.Config)) (*session.Session, error) {
awsConf := aws.NewConfig()

if region, _ := parsedConf.FieldString("region"); region != "" {
Expand Down
5 changes: 4 additions & 1 deletion internal/impl/kafka/input_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/twmb/franz-go/pkg/sasl"

"github.com/benthosdev/benthos/v4/internal/checkpoint"
sess "github.com/benthosdev/benthos/v4/internal/impl/aws"
"github.com/benthosdev/benthos/v4/internal/shutdown"
"github.com/benthosdev/benthos/v4/public/service"
)
Expand Down Expand Up @@ -72,7 +73,9 @@ This input adds the following metadata fields to each message:
Default(true).
Advanced()).
Field(service.NewTLSToggledField("tls")).
Field(saslField)
Field(saslField).
Field(sess.SessionFields()[2])

}

func init() {
Expand Down
4 changes: 3 additions & 1 deletion internal/impl/kafka/output_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl"

sess "github.com/benthosdev/benthos/v4/internal/impl/aws"
"github.com/benthosdev/benthos/v4/internal/shutdown"
"github.com/benthosdev/benthos/v4/public/service"
)
Expand Down Expand Up @@ -69,7 +70,8 @@ This output is new and experimental, and the existing ` + "`kafka`" + ` input is
Optional().
Advanced()).
Field(service.NewTLSToggledField("tls")).
Field(saslField)
Field(saslField).
Field(sess.SessionFields()[2])
}

func init() {
Expand Down
26 changes: 26 additions & 0 deletions internal/impl/kafka/sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (

"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/cache"
sess "github.com/benthosdev/benthos/v4/internal/impl/aws"
ksasl "github.com/benthosdev/benthos/v4/internal/impl/kafka/sasl"
"github.com/benthosdev/benthos/v4/public/service"

"github.com/twmb/franz-go/pkg/sasl"
kaws "github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
Expand All @@ -24,6 +26,7 @@ var saslField = service.NewObjectListField("sasl",
"OAUTHBEARER": "OAuth Bearer based authentication.",
"SCRAM-SHA-256": "SCRAM based authentication as specified in RFC5802.",
"SCRAM-SHA-512": "SCRAM based authentication as specified in RFC5802.",
"AWS_MSK_IAM": "AWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library.",
}).
Description("The SASL mechanism to use."),
service.NewStringField("username").
Expand Down Expand Up @@ -74,6 +77,8 @@ func saslMechanismsFromConfig(c *service.ParsedConfig) ([]sasl.Mechanism, error)
mechanisms[i], err = scram256SaslFromConfig(mConf)
case "SCRAM-SHA-512":
mechanisms[i], err = scram512SaslFromConfig(mConf)
case "AWS_MSK_IAM":
mechanisms[i], err = awsMskSaslFromConfig(c)
default:
err = fmt.Errorf("unknown mechanism: %v", mechStr)
}
Expand Down Expand Up @@ -159,6 +164,27 @@ func scram512SaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) {
}), nil
}

func awsMskSaslFromConfig(c *service.ParsedConfig) (sasl.Mechanism, error) {

awsSession, err := sess.GetSession(c)
if err != nil {
return nil, err
}

creds := awsSession.Config.Credentials
return kaws.ManagedStreamingIAM(func(ctx context.Context) (kaws.Auth, error) {
val, err := creds.GetWithContext(ctx)
if err != nil {
return kaws.Auth{}, err
}
return kaws.Auth{
AccessKey: val.AccessKeyID,
SecretKey: val.SecretAccessKey,
SessionToken: val.SessionToken,
}, nil
}), nil
}

//------------------------------------------------------------------------------

// SASL specific error types.
Expand Down
73 changes: 73 additions & 0 deletions website/docs/components/inputs/kafka_franz.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ input:
root_cas_file: ""
client_certs: []
sasl: []
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
```
</TabItem>
Expand Down Expand Up @@ -299,6 +307,7 @@ Type: `string`

| Option | Summary |
|---|---|
| `AWS_MSK_IAM` | AWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library. |
| `OAUTHBEARER` | OAuth Bearer based authentication. |
| `PLAIN` | Plain text authentication. |
| `SCRAM-SHA-256` | SCRAM based authentication as specified in RFC5802. |
Expand Down Expand Up @@ -336,4 +345,68 @@ Key/value pairs to add to OAUTHBEARER authentication requests.

Type: `object`

### `credentials`

Optional manual configuration of AWS credentials to use. More information can be found [in this document](/docs/guides/cloud/aws).


Type: `object`

### `credentials.profile`

A profile from `~/.aws/credentials` to use.


Type: `string`
Default: `""`

### `credentials.id`

The ID of credentials to use.


Type: `string`
Default: `""`

### `credentials.secret`

The secret for the credentials being used.


Type: `string`
Default: `""`

### `credentials.token`

The token for the credentials being used, required when using short term credentials.


Type: `string`
Default: `""`

### `credentials.from_ec2_role`

Use the credentials of a host EC2 machine configured to assume [an IAM role associated with the instance](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html).


Type: `bool`
Default: `false`
Requires version 4.2.0 or newer

### `credentials.role`

A role ARN to assume.


Type: `string`
Default: `""`

### `credentials.role_external_id`

An external ID to provide when assuming a role.


Type: `string`
Default: `""`


73 changes: 73 additions & 0 deletions website/docs/components/outputs/kafka_franz.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ output:
root_cas_file: ""
client_certs: []
sasl: []
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
```
</TabItem>
Expand Down Expand Up @@ -469,6 +477,7 @@ Type: `string`

| Option | Summary |
|---|---|
| `AWS_MSK_IAM` | AWS IAM based authentication as specified by the 'aws-msk-iam-auth' java library. |
| `OAUTHBEARER` | OAuth Bearer based authentication. |
| `PLAIN` | Plain text authentication. |
| `SCRAM-SHA-256` | SCRAM based authentication as specified in RFC5802. |
Expand Down Expand Up @@ -506,4 +515,68 @@ Key/value pairs to add to OAUTHBEARER authentication requests.

Type: `object`

### `credentials`

Optional manual configuration of AWS credentials to use. More information can be found [in this document](/docs/guides/cloud/aws).


Type: `object`

### `credentials.profile`

A profile from `~/.aws/credentials` to use.


Type: `string`
Default: `""`

### `credentials.id`

The ID of credentials to use.


Type: `string`
Default: `""`

### `credentials.secret`

The secret for the credentials being used.


Type: `string`
Default: `""`

### `credentials.token`

The token for the credentials being used, required when using short term credentials.


Type: `string`
Default: `""`

### `credentials.from_ec2_role`

Use the credentials of a host EC2 machine configured to assume [an IAM role associated with the instance](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html).


Type: `bool`
Default: `false`
Requires version 4.2.0 or newer

### `credentials.role`

A role ARN to assume.


Type: `string`
Default: `""`

### `credentials.role_external_id`

An external ID to provide when assuming a role.


Type: `string`
Default: `""`


0 comments on commit 7050410

Please sign in to comment.