Skip to content

Commit

Permalink
custom data management (for multiple tenant Kafka) (#456)
Browse files Browse the repository at this point in the history
* add custom data

* add client command for custom data

* rename id to key

* fix bugs

* make names consistent
  • Loading branch information
localvar committed Jan 13, 2022
1 parent 7548804 commit 9b29b1c
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 77 deletions.
21 changes: 3 additions & 18 deletions cmd/client/command/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"io"
"net/http"
"os"

yamljsontool "github.com/ghodss/yaml"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -64,6 +63,9 @@ const (
wasmCodeURL = apiURL + "/wasm/code"
wasmDataURL = apiURL + "/wasm/data/%s/%s"

customDataKindURL = apiURL + "/customdata/%s"
customDataURL = apiURL + "/customdata/%s/%s"

// MeshTenantsURL is the mesh tenant prefix.
MeshTenantsURL = apiURL + "/mesh/tenants"

Expand Down Expand Up @@ -162,20 +164,3 @@ func printBody(body []byte) {

fmt.Printf("%s", output)
}

func buildVisitorFromFileOrStdin(specFile string, cmd *cobra.Command) SpecVisitor {
var buff []byte
var err error
if specFile != "" {
buff, err = os.ReadFile(specFile)
if err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
}
} else {
buff, err = io.ReadAll(os.Stdin)
if err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
}
}
return NewSpecVisitor(string(buff))
}
105 changes: 105 additions & 0 deletions cmd/client/command/customdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package command

import (
"errors"
"net/http"

"github.com/spf13/cobra"
)

// CustomDataCmd defines custom data command.
func CustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "custom-data",
Short: "View and change custom data",
}

cmd.AddCommand(listCustomDataCmd())
cmd.AddCommand(getCustomDataCmd())
cmd.AddCommand(updateCustomDataCmd())

return cmd
}

func getCustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "get",
Short: "Get an custom data",
Example: "egctl custom-data get <kind> <id>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return errors.New("requires custom data kind and id to be retrieved")
}
return nil
},

Run: func(cmd *cobra.Command, args []string) {
handleRequest(http.MethodGet, makeURL(customDataURL, args[0], args[1]), nil, cmd)
},
}

return cmd
}

func listCustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List all custom data of a kind",
Example: "egctl custom-data list <kind>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("requires custom data kind to be retrieved")
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
handleRequest(http.MethodGet, makeURL(customDataKindURL, args[0]), nil, cmd)
},
}

return cmd
}

func updateCustomDataCmd() *cobra.Command {
var specFile string
cmd := &cobra.Command{
Use: "update",
Short: "Batch update custom data from a yaml file or stdin",
Example: "egctl custom-data update <kind> -f <change request file>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("requires custom data kind to be retrieved")
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
visitor := buildYAMLVisitor(specFile, cmd)
visitor.Visit(func(yamlDoc []byte) error {
handleRequest(http.MethodPost, makeURL(customDataKindURL, args[0]), yamlDoc, cmd)
return nil
})
visitor.Close()
},
}

cmd.Flags().StringVarP(&specFile, "file", "f", "", "A yaml file specifying the change request.")

return cmd
}
12 changes: 8 additions & 4 deletions cmd/client/command/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ func createObjectCmd() *cobra.Command {
Use: "create",
Short: "Create an object from a yaml file or stdin",
Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPost, makeURL(objectsURL), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}

Expand All @@ -79,10 +81,12 @@ func updateObjectCmd() *cobra.Command {
Use: "update",
Short: "Update an object from a yaml file or stdin",
Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPut, makeURL(objectURL, s.Name), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}

Expand Down
142 changes: 90 additions & 52 deletions cmd/client/command/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,123 @@ package command

import (
"bufio"
"fmt"
"io"
"strings"
"os"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/yaml"
)

// SpecVisitorFunc executes visition logic
type SpecVisitorFunc func(*spec)

// SpecVisitor walk through the document via SpecVisitorFunc
type SpecVisitor interface {
Visit(SpecVisitorFunc)
// YAMLVisitor walk through multiple YAML documents
type YAMLVisitor interface {
Visit(func(yamlDoc []byte) error) error
Close()
}

type spec struct {
Kind string
Name string
doc string
type yamlVisitor struct {
reader io.Reader
}

type specVisitor struct {
io.Reader
// Visit implements YAMLVisitor
func (v *yamlVisitor) Visit(fn func(yamlDoc []byte) error) error {
r := yaml.NewYAMLReader(bufio.NewReader(v.reader))

for {
data, err := r.Read()
if len(data) == 0 {
if err == io.EOF {
return nil
}
if err != nil {
return err
}
continue
}
if err = fn(data); err != nil {
return err
}
}
}

// NewSpecVisitor returns a spec visitor.
func NewSpecVisitor(src string) SpecVisitor {
return &specVisitor{
Reader: strings.NewReader(src),
// Close closes the yamlVisitor
func (v *yamlVisitor) Close() {
if closer, ok := v.reader.(io.Closer); ok {
closer.Close()
}
}

type yamlDecoder struct {
reader *yaml.YAMLReader
doc string
type spec struct {
Kind string
Name string
doc string
}

func newYAMLDecoder(r io.Reader) *yamlDecoder {
return &yamlDecoder{
reader: yaml.NewYAMLReader(bufio.NewReader(r)),
}
// SpecVisitor walk through multiple specs
type SpecVisitor interface {
Visit(func(*spec) error) error
Close()
}

// Decode reads a YAML document into bytes and tries to yaml.Unmarshal it.
func (d *yamlDecoder) Decode(into interface{}) error {
bytes, err := d.reader.Read()
if err != nil && err != io.EOF {
return err
}
d.doc = string(bytes)
if len(bytes) != 0 {
err = yaml.Unmarshal(bytes, into)
}
return err
type specVisitor struct {
v YAMLVisitor
}

// Visit implements SpecVisitor
func (v *specVisitor) Visit(fn SpecVisitorFunc) {
d := newYAMLDecoder(v.Reader)
var validSpecs []spec
for {
var s spec
if err := d.Decode(&s); err != nil {
if err == io.EOF {
break
} else {
ExitWithErrorf("error parsing %s: %v", d.doc, err)
}
func (v *specVisitor) Visit(fn func(*spec) error) error {
var specs []spec

err := v.v.Visit(func(yamlDoc []byte) error {
s := spec{}
doc := string(yamlDoc)

err := yaml.Unmarshal(yamlDoc, &s)
if err != nil {
return fmt.Errorf("error parsing %s: %v", doc, err)
}

if s.Name == "" {
ExitWithErrorf("name is empty: %s", d.doc)
return fmt.Errorf("name is empty: %s", doc)
}

if s.Kind == "" {
ExitWithErrorf("kind is empty: %s", d.doc)
return fmt.Errorf("kind is empty: %s", doc)
}
s.doc = d.doc
//TODO can validate spec's Kind here
validSpecs = append(validSpecs, s)

s.doc = doc
specs = append(specs, s)
return nil
})

if err != nil {
ExitWithError(err)
}
for _, s := range validSpecs {

for _, s := range specs {
fn(&s)
}

return nil
}

// Close closes the specVisitor
func (v *specVisitor) Close() {
v.v.Close()
}

func buildYAMLVisitor(yamlFile string, cmd *cobra.Command) YAMLVisitor {
var r io.ReadCloser
if yamlFile == "" {
r = io.NopCloser(os.Stdin)
} else if f, err := os.Open(yamlFile); err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
} else {
r = f
}
return &yamlVisitor{reader: r}
}

func buildSpecVisitor(yamlFile string, cmd *cobra.Command) SpecVisitor {
v := buildYAMLVisitor(yamlFile, cmd)
return &specVisitor{v: v}
}
6 changes: 4 additions & 2 deletions cmd/client/command/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ func wasmApplyDataCmd() *cobra.Command {
},

Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPost, makeURL(objectsURL), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}
cmd.Flags().StringVarP(&specFile, "file", "f", "", "A yaml file specifying the object.")
Expand Down
1 change: 1 addition & 0 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func main() {
command.ObjectCmd(),
command.MemberCmd(),
command.WasmCmd(),
command.CustomDataCmd(),
completionCmd,
)

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *Server) registerAPIs() {
group.Entries = append(group.Entries, s.metadataAPIEntries()...)
group.Entries = append(group.Entries, s.healthAPIEntries()...)
group.Entries = append(group.Entries, s.aboutAPIEntries()...)
group.Entries = append(group.Entries, s.customDataAPIEntries()...)

for _, fn := range appendAddonAPIs {
fn(s, group)
Expand Down

0 comments on commit 9b29b1c

Please sign in to comment.