Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement jsonrpc2 agent behind feature flag #1954

Merged
merged 11 commits into from
Mar 15, 2017
Next Next commit
put updated agent behind flag
  • Loading branch information
bradrydzewski committed Mar 5, 2017
commit 623be6d830c9f080bf898e80d298c27c1cd98d42
50 changes: 48 additions & 2 deletions drone/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"fmt"
"math"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -60,7 +62,7 @@ var AgentCmd = cli.Command{
Value: "amd64",
},
cli.StringFlag{
EnvVar: "DRONE_SERVER",
EnvVar: "DRONE_SERVER,DRONE_ENDPOINT",
Name: "drone-server",
Usage: "drone server address",
Value: "ws:https://localhost:8000/ws/broker",
Expand Down Expand Up @@ -138,11 +140,55 @@ var AgentCmd = cli.Command{
Name: "extension",
Usage: "custom plugin extension endpoint",
},

//
//
//

cli.BoolFlag{
EnvVar: "DRONE_CANARY",
Name: "canary",
Usage: "enable experimental features at your own risk",
},

// cli.StringFlag{
// Name: "endpoint",
// EnvVar: "DRONE_ENDPOINT,DRONE_SERVER",
// Value: "ws:https://localhost:9999/ws/rpc",
// },
// cli.DurationFlag{
// Name: "backoff",
// EnvVar: "DRONE_BACKOFF",
// Value: time.Second * 15,
// },
cli.IntFlag{
Name: "retry-limit",
EnvVar: "DRONE_RETRY_LIMIT",
Value: math.MaxInt32,
},
cli.IntFlag{
Name: "max-procs",
EnvVar: "DRONE_MAX_PROCS",
Value: 1,
},
cli.StringFlag{
Name: "platform",
EnvVar: "DRONE_PLATFORM",
Value: "linux/amd64",
},
},
}

func start(c *cli.Context) {

if c.Bool("canary") {
if err := loop(c); err != nil {
fmt.Println(err)
os.Exit(1)
}
return
}

log := redlog.New(os.Stderr)
log.SetLevel(0)
logger.SetLogger(log)
Expand Down Expand Up @@ -187,7 +233,7 @@ func start(c *cli.Context) {
client.Ack(m.Ack)
}()

r := pipeline{
r := pipelinet{
drone: client,
docker: docker,
config: config{
Expand Down
4 changes: 2 additions & 2 deletions drone/agent/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type config struct {
extension []string
}

type pipeline struct {
type pipelinet struct {
drone *stomp.Client
docker dockerclient.Client
config config
}

func (r *pipeline) run(w *model.Work) {
func (r *pipelinet) run(w *model.Work) {

// defer func() {
// // r.drone.Ack(id, opts)
Expand Down
191 changes: 191 additions & 0 deletions drone/agent/exp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package agent

import (
"context"
"io"
"log"
"net/url"
"sync"
"time"

"github.com/cncd/pipeline/pipeline"
"github.com/cncd/pipeline/pipeline/backend"
"github.com/cncd/pipeline/pipeline/backend/docker"
"github.com/cncd/pipeline/pipeline/interrupt"
"github.com/cncd/pipeline/pipeline/multipart"
"github.com/cncd/pipeline/pipeline/rpc"

"github.com/codegangsta/cli"
"github.com/tevino/abool"
)

func loop(c *cli.Context) error {
endpoint, err := url.Parse(
c.String("drone-server"),
)
if err != nil {
return err
}

client, err := rpc.NewClient(
endpoint.String(),
rpc.WithRetryLimit(
c.Int("retry-limit"),
),
rpc.WithBackoff(
c.Duration("backoff"),
),
rpc.WithToken(
c.String("drone-secret"),
),
)
if err != nil {
return err
}
defer client.Close()

sigterm := abool.New()
ctx := context.Background()
ctx = interrupt.WithContextFunc(ctx, func() {
println("ctrl+c received, terminating process")
sigterm.Set()
})

var wg sync.WaitGroup
parallel := c.Int("max-procs")
wg.Add(parallel)

for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
for {
if sigterm.IsSet() {
return
}
if err := run(ctx, client); err != nil {
log.Printf("build runner encountered error: exiting: %s", err)
return
}
}
}()
}

wg.Wait()
return nil
}

func run(ctx context.Context, client rpc.Peer) error {
log.Println("pipeline: request next execution")

// get the next job from the queue
work, err := client.Next(ctx)
if err != nil {
return err
}
if work == nil {
return nil
}
log.Printf("pipeline: received next execution: %s", work.ID)

// new docker engine
engine, err := docker.NewEnv()
if err != nil {
return err
}

timeout := time.Hour
if minutes := work.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cancelled := abool.New()
go func() {
ok, _ := client.Notify(ctx, work.ID)
if ok {
cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s", work.ID)
cancel()
} else {
log.Printf("pipeline: cancel channel closed: %s", work.ID)
}
}()

go func() {
for {
select {
case <-ctx.Done():
log.Printf("pipeline: cancel ping loop: %s", work.ID)
return
case <-time.After(time.Minute):
log.Printf("pipeline: ping queue: %s", work.ID)
client.Extend(ctx, work.ID)
}
}
}()

state := rpc.State{}
state.Started = time.Now().Unix()
err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err)
}

defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error {
part, rerr := rc.NextPart()
if rerr != nil {
return rerr
}
writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
io.Copy(writer, part)

defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
}()

part, rerr = rc.NextPart()
if rerr != nil {
return nil
}
mime := part.Header().Get("Content-Type")
if serr := client.Save(context.Background(), work.ID, mime, part); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr)
}
return nil
})

err = pipeline.New(work.Config,
pipeline.WithContext(ctx),
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithEngine(engine),
).Run()

state.Finished = time.Now().Unix()
state.Exited = true
if err != nil {
state.Error = err.Error()
if xerr, ok := err.(*pipeline.ExitError); ok {
state.ExitCode = xerr.Code
}
if xerr, ok := err.(*pipeline.OomError); ok {
state.ExitCode = xerr.Code
}
if cancelled.IsSet() {
state.ExitCode = 130
} else if state.ExitCode == 0 {
state.ExitCode = 1
}
}

log.Printf("pipeline: execution complete: %s", work.ID)

err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err)
}

return nil
}
52 changes: 40 additions & 12 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package router

import (
"net/http"
"os"

"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -119,19 +120,46 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
badges.GET("/cc.xml", server.GetCC)
}

e.POST("/hook", server.PostHook)
e.POST("/api/hook", server.PostHook)
if os.Getenv("DRONE_CANARY") == "" {
e.POST("/hook", server.PostHook)
e.POST("/api/hook", server.PostHook)
} else {
e.POST("/hook", server.PostHook2)
e.POST("/api/hook", server.PostHook2)
}

ws := e.Group("/ws")
{
ws.GET("/broker", server.Broker)
ws.GET("/feed", server.EventStream)
ws.GET("/logs/:owner/:name/:build/:number",
session.SetRepo(),
session.SetPerm(),
session.MustPull,
server.LogStream,
)
if os.Getenv("DRONE_CANARY") == "" {
ws := e.Group("/ws")
{
ws.GET("/broker", server.Broker)
ws.GET("/feed", server.EventStream)
ws.GET("/logs/:owner/:name/:build/:number",
session.SetRepo(),
session.SetPerm(),
session.MustPull,
server.LogStream,
)
}
} else {
ws := e.Group("/ws")
{
ws.GET("/broker", server.RPCHandler)
ws.GET("/rpc", server.RPCHandler)
ws.GET("/feed", server.EventStream2)
ws.GET("/logs/:owner/:name/:build/:number",
session.SetRepo(),
session.SetPerm(),
session.MustPull,
server.LogStream2,
)
}
info := e.Group("/api/info")
{
info.GET("/queue",
session.MustAdmin(),
server.GetQueueInfo,
)
}
}

auth := e.Group("/authorize")
Expand Down
Loading