Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom data management (for multiple tenant Kafka) #456

Merged
merged 5 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions cmd/client/command/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"io"
"net/http"
"os"

yamljsontool "github.com/ghodss/yaml"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -64,6 +63,9 @@ const (
wasmCodeURL = apiURL + "/wasm/code"
wasmDataURL = apiURL + "/wasm/data/%s/%s"

customDataKindURL = apiURL + "/customdata/%s"
customDataURL = apiURL + "/customdata/%s/%s"

// MeshTenantsURL is the mesh tenant prefix.
MeshTenantsURL = apiURL + "/mesh/tenants"

Expand Down Expand Up @@ -162,20 +164,3 @@ func printBody(body []byte) {

fmt.Printf("%s", output)
}

func buildVisitorFromFileOrStdin(specFile string, cmd *cobra.Command) SpecVisitor {
var buff []byte
var err error
if specFile != "" {
buff, err = os.ReadFile(specFile)
if err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
}
} else {
buff, err = io.ReadAll(os.Stdin)
if err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
}
}
return NewSpecVisitor(string(buff))
}
105 changes: 105 additions & 0 deletions cmd/client/command/customdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package command

import (
"errors"
"net/http"

"github.com/spf13/cobra"
)

// CustomDataCmd defines custom data command.
func CustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "custom-data",
Short: "View and change custom data",
}

cmd.AddCommand(listCustomDataCmd())
cmd.AddCommand(getCustomDataCmd())
cmd.AddCommand(updateCustomDataCmd())

return cmd
}

func getCustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "get",
Short: "Get an custom data",
Example: "egctl custom-data get <kind> <id>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return errors.New("requires custom data kind and id to be retrieved")
}
return nil
},

Run: func(cmd *cobra.Command, args []string) {
handleRequest(http.MethodGet, makeURL(customDataURL, args[0], args[1]), nil, cmd)
},
}

return cmd
}

func listCustomDataCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List all custom data of a kind",
Example: "egctl custom-data list <kind>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("requires custom data kind to be retrieved")
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
handleRequest(http.MethodGet, makeURL(customDataKindURL, args[0]), nil, cmd)
},
}

return cmd
}

func updateCustomDataCmd() *cobra.Command {
var specFile string
cmd := &cobra.Command{
Use: "update",
Short: "Batch update custom data from a yaml file or stdin",
Example: "egctl custom-data update <kind> -f <change request file>",
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("requires custom data kind to be retrieved")
}
return nil
},
Run: func(cmd *cobra.Command, args []string) {
visitor := buildYAMLVisitor(specFile, cmd)
visitor.Visit(func(yamlDoc []byte) error {
handleRequest(http.MethodPost, makeURL(customDataKindURL, args[0]), yamlDoc, cmd)
return nil
})
visitor.Close()
},
}

cmd.Flags().StringVarP(&specFile, "file", "f", "", "A yaml file specifying the change request.")

return cmd
}
12 changes: 8 additions & 4 deletions cmd/client/command/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ func createObjectCmd() *cobra.Command {
Use: "create",
Short: "Create an object from a yaml file or stdin",
Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPost, makeURL(objectsURL), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}

Expand All @@ -79,10 +81,12 @@ func updateObjectCmd() *cobra.Command {
Use: "update",
Short: "Update an object from a yaml file or stdin",
Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPut, makeURL(objectURL, s.Name), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}

Expand Down
142 changes: 90 additions & 52 deletions cmd/client/command/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,123 @@ package command

import (
"bufio"
"fmt"
"io"
"strings"
"os"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/yaml"
)

// SpecVisitorFunc executes visition logic
type SpecVisitorFunc func(*spec)

// SpecVisitor walk through the document via SpecVisitorFunc
type SpecVisitor interface {
Visit(SpecVisitorFunc)
// YAMLVisitor walk through multiple YAML documents
type YAMLVisitor interface {
Visit(func(yamlDoc []byte) error) error
Close()
}

type spec struct {
Kind string
Name string
doc string
type yamlVisitor struct {
reader io.Reader
}

type specVisitor struct {
io.Reader
// Visit implements YAMLVisitor
func (v *yamlVisitor) Visit(fn func(yamlDoc []byte) error) error {
r := yaml.NewYAMLReader(bufio.NewReader(v.reader))

for {
data, err := r.Read()
if len(data) == 0 {
if err == io.EOF {
return nil
}
if err != nil {
return err
}
continue
}
if err = fn(data); err != nil {
return err
}
}
}

// NewSpecVisitor returns a spec visitor.
func NewSpecVisitor(src string) SpecVisitor {
return &specVisitor{
Reader: strings.NewReader(src),
// Close closes the yamlVisitor
func (v *yamlVisitor) Close() {
if closer, ok := v.reader.(io.Closer); ok {
closer.Close()
}
}

type yamlDecoder struct {
reader *yaml.YAMLReader
doc string
type spec struct {
Kind string
Name string
doc string
}

func newYAMLDecoder(r io.Reader) *yamlDecoder {
return &yamlDecoder{
reader: yaml.NewYAMLReader(bufio.NewReader(r)),
}
// SpecVisitor walk through multiple specs
type SpecVisitor interface {
Visit(func(*spec) error) error
Close()
}

// Decode reads a YAML document into bytes and tries to yaml.Unmarshal it.
func (d *yamlDecoder) Decode(into interface{}) error {
bytes, err := d.reader.Read()
if err != nil && err != io.EOF {
return err
}
d.doc = string(bytes)
if len(bytes) != 0 {
err = yaml.Unmarshal(bytes, into)
}
return err
type specVisitor struct {
v YAMLVisitor
}

// Visit implements SpecVisitor
func (v *specVisitor) Visit(fn SpecVisitorFunc) {
d := newYAMLDecoder(v.Reader)
var validSpecs []spec
for {
var s spec
if err := d.Decode(&s); err != nil {
if err == io.EOF {
break
} else {
ExitWithErrorf("error parsing %s: %v", d.doc, err)
}
func (v *specVisitor) Visit(fn func(*spec) error) error {
var specs []spec

err := v.v.Visit(func(yamlDoc []byte) error {
s := spec{}
doc := string(yamlDoc)

err := yaml.Unmarshal(yamlDoc, &s)
if err != nil {
return fmt.Errorf("error parsing %s: %v", doc, err)
}

if s.Name == "" {
ExitWithErrorf("name is empty: %s", d.doc)
return fmt.Errorf("name is empty: %s", doc)
}

if s.Kind == "" {
ExitWithErrorf("kind is empty: %s", d.doc)
return fmt.Errorf("kind is empty: %s", doc)
}
s.doc = d.doc
//TODO can validate spec's Kind here
validSpecs = append(validSpecs, s)

s.doc = doc
specs = append(specs, s)
return nil
})

if err != nil {
ExitWithError(err)
}
for _, s := range validSpecs {

for _, s := range specs {
fn(&s)
}

return nil
}

// Close closes the specVisitor
func (v *specVisitor) Close() {
v.v.Close()
}

func buildYAMLVisitor(yamlFile string, cmd *cobra.Command) YAMLVisitor {
var r io.ReadCloser
if yamlFile == "" {
r = io.NopCloser(os.Stdin)
} else if f, err := os.Open(yamlFile); err != nil {
ExitWithErrorf("%s failed: %v", cmd.Short, err)
} else {
r = f
}
return &yamlVisitor{reader: r}
}

func buildSpecVisitor(yamlFile string, cmd *cobra.Command) SpecVisitor {
v := buildYAMLVisitor(yamlFile, cmd)
return &specVisitor{v: v}
}
6 changes: 4 additions & 2 deletions cmd/client/command/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ func wasmApplyDataCmd() *cobra.Command {
},

Run: func(cmd *cobra.Command, args []string) {
visitor := buildVisitorFromFileOrStdin(specFile, cmd)
visitor.Visit(func(s *spec) {
visitor := buildSpecVisitor(specFile, cmd)
visitor.Visit(func(s *spec) error {
handleRequest(http.MethodPost, makeURL(objectsURL), []byte(s.doc), cmd)
return nil
})
visitor.Close()
},
}
cmd.Flags().StringVarP(&specFile, "file", "f", "", "A yaml file specifying the object.")
Expand Down
1 change: 1 addition & 0 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func main() {
command.ObjectCmd(),
command.MemberCmd(),
command.WasmCmd(),
command.CustomDataCmd(),
completionCmd,
)

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (s *Server) registerAPIs() {
group.Entries = append(group.Entries, s.metadataAPIEntries()...)
group.Entries = append(group.Entries, s.healthAPIEntries()...)
group.Entries = append(group.Entries, s.aboutAPIEntries()...)
group.Entries = append(group.Entries, s.customDataAPIEntries()...)

for _, fn := range appendAddonAPIs {
fn(s, group)
Expand Down
Loading