Skip to content

Commit

Permalink
Add Workflow step
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Mar 29, 2024
1 parent 2ccd462 commit adaa601
Show file tree
Hide file tree
Showing 13 changed files with 605 additions and 40 deletions.
42 changes: 25 additions & 17 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const workflowAssetDir = "statics/workflows"

// Status analyzer object
//
// Status describes the status of an analyzer
// # Status describes the status of an analyzer
//
// swagger:model AnalyzerStatus
// easyjson:json
Expand Down Expand Up @@ -354,18 +354,6 @@ func NewServerFromConfig() (*Server, error) {

tableClient := flow.NewWSTableClient(hub.PodServer())

// declare all extension available through API and filtering
tr := hub.GremlinTraversalParser()
tr.AddTraversalExtension(ge.NewMetricsTraversalExtension())
tr.AddTraversalExtension(ge.NewRawPacketsTraversalExtension())
tr.AddTraversalExtension(ge.NewFlowTraversalExtension(tableClient, s.flowStorage))
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewAscendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())

// new flow subscriber endpoints
flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend))
flowSubscriberEndpoint := server.NewFlowSubscriberEndpoint(flowSubscriberWSServer)
Expand All @@ -381,6 +369,18 @@ func NewServerFromConfig() (*Server, error) {
edgeRuleAPIHandler := api.RegisterEdgeRuleAPI(apiServer, g, apiAuthBackend)
s.topologyManager = usertopology.NewTopologyManager(etcdClient, nodeRuleAPIHandler, edgeRuleAPIHandler, g)

_ = gapi.RegisterAlertAPI(apiServer, apiAuthBackend)

tr := hub.GremlinTraversalParser()
jsre, err := api.NewRuntime(g, tr, apiServer, opts.Assets)
if err != nil {
return nil, err
}

if _, err := api.RegisterWorkflowAPI(apiServer, apiAuthBackend, jsre); err != nil {
return nil, err
}

s.onDemandClient = ondemand.NewOnDemandFlowProbeClient(g, captureAPIHandler, hub.PodServer(), hub.SubscriberServer(), etcdClient)

s.flowServer, err = server.NewFlowServer(hub.HTTPServer(), g, s.flowStorage, flowSubscriberEndpoint, probeBundle, clusterAuthBackend)
Expand All @@ -392,16 +392,24 @@ func NewServerFromConfig() (*Server, error) {
api.RegisterPcapAPI(httpServer, s.flowStorage, apiAuthBackend)
api.RegisterConfigAPI(httpServer, apiAuthBackend)

if err := s.loadStaticWorkflows(); err != nil {
return nil, err
}

if config.GetBool("analyzer.ssh_enabled") {
if err := dede.RegisterHandler("terminal", "/dede", httpServer.Router); err != nil {
return nil, err
}
}

// declare all extension available through API and filtering
tr.AddTraversalExtension(ge.NewMetricsTraversalExtension())
tr.AddTraversalExtension(ge.NewRawPacketsTraversalExtension())
tr.AddTraversalExtension(ge.NewFlowTraversalExtension(tableClient, s.flowStorage))
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewAscendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())
tr.AddTraversalExtension(ge.NewWorkflowTraversalExtension(apiServer.GetHandler("workflow"), jsre))

return s, nil
}

Expand Down
147 changes: 147 additions & 0 deletions api/server/jsre.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (C) 2019 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy ofthe License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specificlanguage governing permissions and
* limitations under the License.
*
*/

package server

import (
"encoding/json"
"fmt"
"strings"

"github.com/robertkrimen/otto"
api "github.com/skydive-project/skydive/graffiti/api/server"
"github.com/skydive-project/skydive/graffiti/api/types"
"github.com/skydive-project/skydive/graffiti/assets"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/graph/traversal"
"github.com/skydive-project/skydive/graffiti/js"
)

// NewRuntime returns a new JavaScript runtime where accesses to resources are done
// directly using the handlers, not through HTTP requests.
func NewRuntime(g *graph.Graph, tr *traversal.GremlinTraversalParser, server *api.Server, assets assets.Assets) (*js.Runtime, error) {
runtime, err := js.NewRuntime(assets)
if err != nil {
return nil, err
}
runtime.Start()

queryGremlin := func(query string) otto.Value {
ts, err := tr.Parse(strings.NewReader(query))
if err != nil {
return runtime.MakeCustomError("ParseError", err.Error())
}

result, err := ts.Exec(g, false)
if err != nil {
return runtime.MakeCustomError("ExecuteError", err.Error())
}

source, err := result.MarshalJSON()
if err != nil {
return runtime.MakeCustomError("MarshalError", err.Error())
}

r, _ := runtime.ToValue(string(source))
return r
}

runtime.Set("Gremlin", func(call otto.FunctionCall) otto.Value {
if len(call.ArgumentList) < 1 || !call.Argument(0).IsString() {
return runtime.MakeCustomError("MissingQueryArgument", "Gremlin requires a string parameter")
}

query := call.Argument(0).String()

return queryGremlin(query)
})

runtime.Set("request", func(call otto.FunctionCall) otto.Value {
if len(call.ArgumentList) < 3 || !call.Argument(0).IsString() || !call.Argument(1).IsString() || !call.Argument(2).IsString() {
return runtime.MakeCustomError("WrongArguments", "Import requires 3 string parameters")
}

url := call.Argument(0).String()
method := call.Argument(1).String()
data := []byte(call.Argument(2).String())

subs := strings.Split(url, "/") // filepath.Base(url)
if len(subs) < 3 {
return runtime.MakeCustomError("WrongArgument", fmt.Sprintf("Malformed URL %s", url))
}
resource := subs[2]

// For topology query, we directly call the Gremlin engine
if resource == "topology" {
query := types.TopologyParams{}
if err := json.Unmarshal(data, &query); err != nil {
return runtime.MakeCustomError("WrongArgument", fmt.Sprintf("Invalid query %s", string(data)))
}

return queryGremlin(query.GremlinQuery)
}

// This a CRUD call
handler := server.GetHandler(resource)

var err error
var content interface{}

switch method {
case "POST":
res := handler.New()
if err := json.Unmarshal([]byte(data), res); err != nil {
return runtime.MakeCustomError("UnmarshalError", err.Error())
}
if err := handler.Create(res, nil); err != nil {
return runtime.MakeCustomError("CreateError", err.Error())
}
b, _ := json.Marshal(res)
content = string(b)

case "DELETE":
if len(subs) < 4 {
return runtime.MakeCustomError("WrongArgument", "No ID specified")
}
handler.Delete(subs[3])

case "GET":
if len(subs) < 4 {
resources := handler.Index()
b, _ := json.Marshal(resources)
content = string(b)
} else {
id := subs[3]
obj, found := handler.Get(id)
if !found {
return runtime.MakeCustomError("NotFound", fmt.Sprintf("%s %s could not be found", resource, id))
}
b, _ := json.Marshal(obj)
content = string(b)
}
}

value, err := otto.ToValue(content)
if err != nil {
return runtime.MakeCustomError("WrongValue", err.Error())
}

return value
})

return runtime, nil
}
Loading

0 comments on commit adaa601

Please sign in to comment.