Skip to content

Commit

Permalink
fix(audit): es password decode and es v7 support
Browse files Browse the repository at this point in the history
Signed-off-by: forrestchen <[email protected]>
  • Loading branch information
ChenLingPeng authored and tke-robot committed Jul 15, 2020
1 parent fa260c4 commit ce92354
Showing 1 changed file with 102 additions and 16 deletions.
118 changes: 102 additions & 16 deletions pkg/audit/storage/es/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package es

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -36,6 +38,7 @@ type es struct {
ReserveDays int
username string
password string
v7 bool
}

func NewStorage(conf *config.ElasticSearchStorage) (storage.AuditStorage, error) {
Expand All @@ -44,7 +47,13 @@ func NewStorage(conf *config.ElasticSearchStorage) (storage.AuditStorage, error)
Indices: conf.Indices,
ReserveDays: conf.ReserveDays,
username: conf.Username,
password: conf.Password,
}
if conf.Password != "" {
password, err := base64.StdEncoding.DecodeString(conf.Password)
if err != nil {
return nil, fmt.Errorf("decode password failed: %v", err)
}
cli.password = string(password)
}
if cli.Indices == "" {
cli.Indices = defaultIndices
Expand All @@ -62,15 +71,45 @@ func NewStorage(conf *config.ElasticSearchStorage) (storage.AuditStorage, error)
}

func (s *es) init() error {
if err := s.detectVersion(); err != nil {
return err
}
if !s.indicesTypeExist() {
return s.indicesTypeCreate()
}
return nil
}

func (s *es) detectVersion() error {
type version struct {
Number string `json:"number"`
}
type info struct {
Version version `json:"version"`
}
vinfo := info{}
_, body, errs := gorequest.New().Get(s.Addr).SetBasicAuth(s.username, s.password).End()
if len(errs) > 0 {
return fmt.Errorf("detect es version failed: %v", errs)
}
if err := json.Unmarshal([]byte(body), &vinfo); err != nil {
return fmt.Errorf("detect es version failed: %v", err)
}
if strings.HasPrefix(vinfo.Version.Number, "7.") {
s.v7 = true
} else if !strings.HasPrefix(vinfo.Version.Number, "6.") {
return fmt.Errorf("un-supported version %v", vinfo.Version.Number)
}
return nil
}

func (s *es) indicesTypeExist() bool {
req := gorequest.New()
resp, _, err := req.Get(fmt.Sprintf("%s/%s/_mapping/%s", s.Addr, s.Indices, typ)).SetBasicAuth(s.username, s.password).End()
url := fmt.Sprintf("%s/%s/_mapping/%s", s.Addr, s.Indices, typ)
if s.v7 {
url = fmt.Sprintf("%s/%s/_mapping", s.Addr, s.Indices)
}
resp, _, err := req.Get(url).SetBasicAuth(s.username, s.password).End()
if len(err) != 0 {
return false
} else if resp.StatusCode != 200 {
Expand Down Expand Up @@ -105,12 +144,21 @@ func (s *es) indicesTypeCreate() error {
"type": "text",
}
}
reqBody := map[string]interface{}{
"mappings": map[string]interface{}{
typ: map[string]interface{}{
var reqBody map[string]interface{}
if s.v7 {
reqBody = map[string]interface{}{
"mappings": map[string]interface{}{
"properties": properties,
},
},
}
} else {
reqBody = map[string]interface{}{
"mappings": map[string]interface{}{
typ: map[string]interface{}{
"properties": properties,
},
},
}
}

resp, body, err := req.SendStruct(reqBody).End()
Expand Down Expand Up @@ -168,35 +216,63 @@ func (s *es) Query(param *storage.QueryParameter) ([]*types.Event, int, error) {
"bool": {"filter": terms},
}
}
req := gorequest.New().Get(fmt.Sprintf("%s/%s/%s/_search", s.Addr, s.Indices, typ)).SetBasicAuth(s.username, s.password)
url := fmt.Sprintf("%s/%s/%s/_search", s.Addr, s.Indices, typ)
if s.v7 {
url = fmt.Sprintf("%s/%s/_search", s.Addr, s.Indices)
}
req := gorequest.New().Get(url).SetBasicAuth(s.username, s.password)
req.Header["content-type"] = "application/json"
resp, body, errs := req.SendStruct(query).End()
if len(errs) > 0 {
return nil, 0, fmt.Errorf("failed search documents: %v", errs)
} else if resp.StatusCode >= 300 {
return nil, 0, fmt.Errorf("failed search document: %s", body)
}
res := Result{}
err := json.Unmarshal([]byte(body), &res)
var res interface{}
if s.v7 {
res = &ResultV7{}
} else {
res = &Result{}
}
err := json.Unmarshal([]byte(body), res)
if err != nil {
return nil, 0, err
}
events := make([]*types.Event, 0)
for _, ev := range res.Hits.Hits {
if s.v7 {
for _, ev := range res.(*ResultV7).Hits.Hits {
events = append(events, ev.Event)
}
return events, res.(*ResultV7).Hits.Total.Value, nil
}
for _, ev := range res.(*Result).Hits.Hits {
events = append(events, ev.Event)
}
return events, res.Hits.Total, nil
return events, res.(*Result).Hits.Total, nil
}

type Result struct {
Hits Hits `json:"hits"`
}

type ResultV7 struct {
Hits HitsV7 `json:"hits"`
}

type Hits struct {
Total int `json:"total"`
Hits []*Document `json:"hits"`
}

type HitsV7 struct {
Total TotalV7 `json:"total"`
Hits []*Document `json:"hits"`
}

type TotalV7 struct {
Value int `json:"value"`
}

type Document struct {
Event *types.Event `json:"_source"`
}
Expand Down Expand Up @@ -229,7 +305,11 @@ func (s *es) FieldValues() map[string][]string {
}

func (s *es) batchSave(events []*types.Event) error {
req := gorequest.New().Post(fmt.Sprintf("%s/%s/%s/_bulk", s.Addr, s.Indices, typ)).SetBasicAuth(s.username, s.password)
url := fmt.Sprintf("%s/%s/%s/_bulk", s.Addr, s.Indices, typ)
if s.v7 {
url = fmt.Sprintf("%s/%s/_bulk", s.Addr, s.Indices)
}
req := gorequest.New().Post(url).SetBasicAuth(s.username, s.password)
req.Header["content-type"] = "application/x-ndjson"
req.BounceToRawString = true
buf := bytes.NewBuffer(nil)
Expand All @@ -251,7 +331,11 @@ func (s *es) batchSave(events []*types.Event) error {

func (s *es) cleanup() {
log.Infof("trigger es audit event cleanup")
req := gorequest.New().Post(fmt.Sprintf("%s/%s/%s/_delete_by_query", s.Addr, s.Indices, typ)).SetBasicAuth(s.username, s.password)
url := fmt.Sprintf("%s/%s/%s/_delete_by_query", s.Addr, s.Indices, typ)
if s.v7 {
url = fmt.Sprintf("%s/%s/_delete_by_query", s.Addr, s.Indices)
}
req := gorequest.New().Post(url).SetBasicAuth(s.username, s.password)
req.Header["content-type"] = "application/json"
t := time.Now().Unix()*1000 - int64(s.ReserveDays*24*60*60*1000)
query := fmt.Sprintf(`{"query":{"bool":{"filter":{"range":{"requestReceivedTimestamp":{"lte":%d}}}}}}`, t)
Expand All @@ -269,7 +353,11 @@ func (s *es) updateFieldEnumCache() {
wg.Add(1)
go func(field string) {
defer wg.Done()
req := gorequest.New().Get(fmt.Sprintf("%s/%s/%s/_search", s.Addr, s.Indices, typ)).SetBasicAuth(s.username, s.password)
url := fmt.Sprintf("%s/%s/%s/_search", s.Addr, s.Indices, typ)
if s.v7 {
url = fmt.Sprintf("%s/%s/_search", s.Addr, s.Indices)
}
req := gorequest.New().Get(url).SetBasicAuth(s.username, s.password)
req.Header["content-type"] = "application/json"
_, body, errs := req.SendString(fmt.Sprintf(`{"size":0,"aggs":{"distinct_colors":{"terms":{"field":"%s","size":1000}}}}`, field)).End()
result := struct {
Expand Down Expand Up @@ -297,8 +385,6 @@ func (s *es) updateFieldEnumCache() {
l.Lock()
tmpMap[field] = values
l.Unlock()
} else {
fmt.Printf("filed %s return %v\n", field, body)
}
}(field)
}
Expand Down

0 comments on commit ce92354

Please sign in to comment.