Skip to content

Commit

Permalink
Add support for Kafka Topic User ACL management (#1056)
Browse files Browse the repository at this point in the history
* Kafka User Support

* fix tests

* update comments and refactor

* fix docs

* Update digitalocean/database/resource_database_user.go

Co-authored-by: Andrew Starr-Bochicchio <[email protected]>

---------

Co-authored-by: Andrew Starr-Bochicchio <[email protected]>
  • Loading branch information
dweinshenker and andrewsomething committed Nov 3, 2023
1 parent 57f343b commit 74ec657
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 2 deletions.
16 changes: 16 additions & 0 deletions digitalocean/database/datasource_database_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ func DataSourceDigitalOceanDatabaseUser() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},
"settings": {
Type: schema.TypeList,
Computed: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"acl": {
Type: schema.TypeList,
Optional: true,
Elem: userACLSchema(),
},
},
},
},
},
}
}
Expand All @@ -62,5 +75,8 @@ func dataSourceDigitalOceanDatabaseUserRead(ctx context.Context, d *schema.Resou
d.Set("mysql_auth_plugin", user.MySQLSettings.AuthPlugin)
}

if err := d.Set("settings", flattenUserSettings(user.Settings)); err != nil {
return diag.Errorf("Error setting user settings: %#v", err)
}
return nil
}
110 changes: 108 additions & 2 deletions digitalocean/database/resource_database_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,19 @@ func ResourceDigitalOceanDatabaseUser() *schema.Resource {
return old == godo.SQLAuthPluginCachingSHA2 && new == ""
},
},

// Computed Properties
"settings": {
Type: schema.TypeList,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"acl": {
Type: schema.TypeList,
Optional: true,
Elem: userACLSchema(),
},
},
},
},
"role": {
Type: schema.TypeString,
Computed: true,
Expand All @@ -67,6 +78,34 @@ func ResourceDigitalOceanDatabaseUser() *schema.Resource {
}
}

func userACLSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"id": {
Type: schema.TypeString,
Computed: true,
},
"topic": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.NoZeroValues,
},
"permission": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
"admin",
"consume",
"produce",
"produceconsume",
}, false),
},
},
}
}

func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*config.CombinedConfig).GodoClient()
clusterID := d.Get("cluster_id").(string)
Expand All @@ -81,6 +120,10 @@ func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.Resou
}
}

if v, ok := d.GetOk("settings"); ok {
opts.Settings = expandUserSettings(v.([]interface{}))
}

// Prevent parallel creation of users for same cluster.
key := fmt.Sprintf("digitalocean_database_cluster/%s/users", clusterID)
mutexKV.Lock(key)
Expand All @@ -95,6 +138,11 @@ func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.Resou
d.SetId(makeDatabaseUserID(clusterID, user.Name))
log.Printf("[INFO] Database User Name: %s", user.Name)

// set userSettings only on CreateUser, due to CreateUser responses including `settings` but GetUser responses not including `settings`
if err := d.Set("settings", flattenUserSettings(user.Settings)); err != nil {
return diag.Errorf("Error setting user settings: %#v", err)
}

setDatabaseUserAttributes(d, user)

return nil
Expand Down Expand Up @@ -203,3 +251,61 @@ func resourceDigitalOceanDatabaseUserImport(d *schema.ResourceData, meta interfa
func makeDatabaseUserID(clusterID string, name string) string {
return fmt.Sprintf("%s/user/%s", clusterID, name)
}

func expandUserSettings(raw []interface{}) *godo.DatabaseUserSettings {
if len(raw) == 0 || raw[0] == nil {
return &godo.DatabaseUserSettings{}
}
userSettingsConfig := raw[0].(map[string]interface{})

userSettings := &godo.DatabaseUserSettings{
ACL: expandUserACLs(userSettingsConfig["acl"].([]interface{})),
}
return userSettings
}

func expandUserACLs(rawACLs []interface{}) []*godo.KafkaACL {
acls := make([]*godo.KafkaACL, 0, len(rawACLs))
for _, rawACL := range rawACLs {
a := rawACL.(map[string]interface{})
acl := &godo.KafkaACL{
Topic: a["topic"].(string),
Permission: a["permission"].(string),
}
acls = append(acls, acl)
}
return acls
}

func flattenUserSettings(settings *godo.DatabaseUserSettings) []map[string]interface{} {
result := make([]map[string]interface{}, 0, 1)
if settings != nil {
r := make(map[string]interface{})
r["acl"] = flattenUserACLs(settings.ACL)
result = append(result, r)
}
return result
}

func flattenUserACLs(acls []*godo.KafkaACL) []map[string]interface{} {
result := make([]map[string]interface{}, len(acls))
for i, acl := range acls {
item := make(map[string]interface{})
item["id"] = acl.ID
item["topic"] = acl.Topic
item["permission"] = normalizePermission(acl.Permission)
result[i] = item
}
return result
}

func normalizePermission(p string) string {
pLower := strings.ToLower(p)
switch pLower {
case "admin", "produce", "consume":
return pLower
case "produceconsume", "produce_consume", "readwrite", "read_write":
return "produceconsume"
}
return ""
}
84 changes: 84 additions & 0 deletions digitalocean/database/resource_database_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,57 @@ func TestAccDigitalOceanDatabaseUser_MySQLAuth(t *testing.T) {
})
}

func TestAccDigitalOceanDatabaseUser_KafkaACLs(t *testing.T) {
var databaseUser godo.DatabaseUser
databaseClusterName := acceptance.RandomTestName()
databaseUserName := acceptance.RandomTestName()

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acceptance.TestAccPreCheck(t) },
ProviderFactories: acceptance.TestAccProviderFactories,
CheckDestroy: testAccCheckDigitalOceanDatabaseUserDestroy,
Steps: []resource.TestStep{
{
Config: fmt.Sprintf(testAccCheckDigitalOceanDatabaseUserConfigKafkaACL, databaseClusterName, databaseUserName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDigitalOceanDatabaseUserExists("digitalocean_database_user.foobar_user", &databaseUser),
testAccCheckDigitalOceanDatabaseUserAttributes(&databaseUser, databaseUserName),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "name", databaseUserName),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "role"),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "password"),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "settings.0.acl.0.id"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.0.topic", "topic-1"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.0.permission", "admin"),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "settings.0.acl.1.id"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.1.topic", "topic-2"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.1.permission", "produceconsume"),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "settings.0.acl.2.id"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.2.topic", "topic-*"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.2.permission", "produce"),
resource.TestCheckResourceAttrSet(
"digitalocean_database_user.foobar_user", "settings.0.acl.3.id"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.3.topic", "topic-*"),
resource.TestCheckResourceAttr(
"digitalocean_database_user.foobar_user", "settings.0.acl.3.permission", "consume"),
),
},
},
})
}

func testAccCheckDigitalOceanDatabaseUserDestroy(s *terraform.State) error {
client := acceptance.TestAccProvider.Meta().(*config.CombinedConfig).GodoClient()

Expand Down Expand Up @@ -361,6 +412,39 @@ resource "digitalocean_database_user" "foobar_user" {
mysql_auth_plugin = "mysql_native_password"
}`

const testAccCheckDigitalOceanDatabaseUserConfigKafkaACL = `
resource "digitalocean_database_cluster" "foobar" {
name = "%s"
engine = "kafka"
version = "3.5"
size = "db-s-1vcpu-2gb"
region = "nyc1"
node_count = 3
}
resource "digitalocean_database_user" "foobar_user" {
cluster_id = digitalocean_database_cluster.foobar.id
name = "%s"
settings {
acl {
topic = "topic-1"
permission = "admin"
}
acl {
topic = "topic-2"
permission = "produceconsume"
}
acl {
topic = "topic-*"
permission = "produce"
}
acl {
topic = "topic-*"
permission = "consume"
}
}
}`

const testAccCheckDigitalOceanDatabaseUserConfigMySQLAuthUpdate = `
resource "digitalocean_database_cluster" "foobar" {
name = "%s"
Expand Down
50 changes: 50 additions & 0 deletions docs/resources/database_user.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,60 @@ resource "digitalocean_database_user" "user-example" {
}
```

### Create a new user for a Kafka database cluster
```hcl
resource "digitalocean_database_cluster" "kafka-example" {
name = "example-kafka-cluster"
engine = "kafka"
version = "3.5"
size = "db-s-1vcpu-2gb"
region = "nyc1"
node_count = 3
}
resource "digitalocean_database_kafka_topic" "foobar_topic" {
cluster_id = digitalocean_database_cluster.foobar.id
name = "topic-1"
}
resource "digitalocean_database_user" "foobar_user" {
cluster_id = digitalocean_database_cluster.foobar.id
name = "example-user"
settings {
acl {
topic = "topic-1"
permission = "produce"
}
acl {
topic = "topic-2"
permission = "produceconsume"
}
acl {
topic = "topic-*"
permission = "consume"
}
}
}
```

## Argument Reference

The following arguments are supported:

* `cluster_id` - (Required) The ID of the original source database cluster.
* `name` - (Required) The name for the database user.
* `mysql_auth_plugin` - (Optional) The authentication method to use for connections to the MySQL user account. The valid values are `mysql_native_password` or `caching_sha2_password` (this is the default).
* `settings` - (Optional) Contains optional settings for the user.
The `settings` block is documented below.

`settings` supports the following:

* `acl` - (Optional) A set of ACLs (Access Control Lists) specifying permission on topics with a Kafka cluster. The properties of an individual ACL are described below:

An individual ACL includes the following:

* `topic` - (Required) A regex for matching the topic(s) that this ACL should apply to.
* `permission` - (Required) The permission level applied to the ACL. This includes "admin", "consume", "produce", and "produceconsume". "admin" allows for producing and consuming as well as add/delete/update permission for topics. "consume" allows only for reading topic messages. "produce" allows only for writing topic messages. "produceconsume" allows for both reading and writing topic messages.

## Attributes Reference

Expand All @@ -66,6 +113,9 @@ In addition to the above arguments, the following attributes are exported:
* `role` - Role for the database user. The value will be either "primary" or "normal".
* `password` - Password for the database user.

For individual ACLs for Kafka topics, the following attributes are exported:
* `id` - An identifier for the ACL, this will be automatically assigned when you create an ACL entry

## Import

Database user can be imported using the `id` of the source database cluster
Expand Down

0 comments on commit 74ec657

Please sign in to comment.