Skip to content

Commit

Permalink
Merge pull request #1954 from bradrydzewski/master
Browse files Browse the repository at this point in the history
implement jsonrpc2 agent behind feature flag
  • Loading branch information
bradrydzewski committed Mar 15, 2017
2 parents dc5f01d + fc46a5b commit fe669b7
Show file tree
Hide file tree
Showing 329 changed files with 30,281 additions and 190 deletions.
4 changes: 2 additions & 2 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ workspace:

pipeline:
test:
image: golang:1.6
image: golang:1.8
environment:
- GO15VENDOREXPERIMENT=1
commands:
- make deps gen
- make test test_postgres test_mysql

compile:
image: golang:1.6
image: golang:1.8
environment:
- GO15VENDOREXPERIMENT=1
- GOPATH=/go
Expand Down
50 changes: 48 additions & 2 deletions drone/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"fmt"
"math"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -60,7 +62,7 @@ var AgentCmd = cli.Command{
Value: "amd64",
},
cli.StringFlag{
EnvVar: "DRONE_SERVER",
EnvVar: "DRONE_SERVER,DRONE_ENDPOINT",
Name: "drone-server",
Usage: "drone server address",
Value: "ws:https://localhost:8000/ws/broker",
Expand Down Expand Up @@ -138,11 +140,55 @@ var AgentCmd = cli.Command{
Name: "extension",
Usage: "custom plugin extension endpoint",
},

//
//
//

cli.BoolFlag{
EnvVar: "DRONE_CANARY",
Name: "canary",
Usage: "enable experimental features at your own risk",
},

// cli.StringFlag{
// Name: "endpoint",
// EnvVar: "DRONE_ENDPOINT,DRONE_SERVER",
// Value: "ws:https://localhost:9999/ws/rpc",
// },
// cli.DurationFlag{
// Name: "backoff",
// EnvVar: "DRONE_BACKOFF",
// Value: time.Second * 15,
// },
cli.IntFlag{
Name: "retry-limit",
EnvVar: "DRONE_RETRY_LIMIT",
Value: math.MaxInt32,
},
cli.IntFlag{
Name: "max-procs",
EnvVar: "DRONE_MAX_PROCS",
Value: 1,
},
cli.StringFlag{
Name: "platform",
EnvVar: "DRONE_PLATFORM",
Value: "linux/amd64",
},
},
}

func start(c *cli.Context) {

if c.Bool("canary") {
if err := loop(c); err != nil {
fmt.Println(err)
os.Exit(1)
}
return
}

log := redlog.New(os.Stderr)
log.SetLevel(0)
logger.SetLogger(log)
Expand Down Expand Up @@ -187,7 +233,7 @@ func start(c *cli.Context) {
client.Ack(m.Ack)
}()

r := pipeline{
r := pipelinet{
drone: client,
docker: docker,
config: config{
Expand Down
4 changes: 2 additions & 2 deletions drone/agent/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type config struct {
extension []string
}

type pipeline struct {
type pipelinet struct {
drone *stomp.Client
docker dockerclient.Client
config config
}

func (r *pipeline) run(w *model.Work) {
func (r *pipelinet) run(w *model.Work) {

// defer func() {
// // r.drone.Ack(id, opts)
Expand Down
206 changes: 206 additions & 0 deletions drone/agent/exp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package agent

import (
"context"
"io"
"log"
"net/url"
"sync"
"time"

"github.com/cncd/pipeline/pipeline"
"github.com/cncd/pipeline/pipeline/backend"
"github.com/cncd/pipeline/pipeline/backend/docker"
"github.com/cncd/pipeline/pipeline/interrupt"
"github.com/cncd/pipeline/pipeline/multipart"
"github.com/cncd/pipeline/pipeline/rpc"

"github.com/codegangsta/cli"
"github.com/tevino/abool"
)

func loop(c *cli.Context) error {
endpoint, err := url.Parse(
c.String("drone-server"),
)
if err != nil {
return err
}
filter := rpc.Filter{
Labels: map[string]string{
"platform": c.String("platform"),
},
}

client, err := rpc.NewClient(
endpoint.String(),
rpc.WithRetryLimit(
c.Int("retry-limit"),
),
rpc.WithBackoff(
c.Duration("backoff"),
),
rpc.WithToken(
c.String("drone-secret"),
),
)
if err != nil {
return err
}
defer client.Close()

sigterm := abool.New()
ctx := context.Background()
ctx = interrupt.WithContextFunc(ctx, func() {
println("ctrl+c received, terminating process")
sigterm.Set()
})

var wg sync.WaitGroup
parallel := c.Int("max-procs")
wg.Add(parallel)

for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
for {
if sigterm.IsSet() {
return
}
if err := run(ctx, client, filter); err != nil {
log.Printf("build runner encountered error: exiting: %s", err)
return
}
}
}()
}

wg.Wait()
return nil
}

const (
maxFileUpload = 5000000
maxLogsUpload = 5000000
)

func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
log.Println("pipeline: request next execution")

// get the next job from the queue
work, err := client.Next(ctx, filter)
if err != nil {
return err
}
if work == nil {
return nil
}
log.Printf("pipeline: received next execution: %s", work.ID)

// new docker engine
engine, err := docker.NewEnv()
if err != nil {
return err
}

timeout := time.Hour
if minutes := work.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cancelled := abool.New()
go func() {
if werr := client.Wait(ctx, work.ID); werr != nil {
cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr)
cancel()
} else {
log.Printf("pipeline: cancel channel closed: %s", work.ID)
}
}()

go func() {
for {
select {
case <-ctx.Done():
log.Printf("pipeline: cancel ping loop: %s", work.ID)
return
case <-time.After(time.Minute):
log.Printf("pipeline: ping queue: %s", work.ID)
client.Extend(ctx, work.ID)
}
}
}()

state := rpc.State{}
state.Started = time.Now().Unix()
err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err)
}

var uploads sync.WaitGroup
defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error {
part, rerr := rc.NextPart()
if rerr != nil {
return rerr
}
uploads.Add(1)
writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
rlimit := io.LimitReader(part, maxLogsUpload)
io.Copy(writer, rlimit)

defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
uploads.Done()
}()

part, rerr = rc.NextPart()
if rerr != nil {
return nil
}
rlimit = io.LimitReader(part, maxFileUpload)
mime := part.Header().Get("Content-Type")
if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr)
}
return nil
})

err = pipeline.New(work.Config,
pipeline.WithContext(ctx),
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithEngine(engine),
).Run()

state.Finished = time.Now().Unix()
state.Exited = true
if err != nil {
state.Error = err.Error()
if xerr, ok := err.(*pipeline.ExitError); ok {
state.ExitCode = xerr.Code
}
if xerr, ok := err.(*pipeline.OomError); ok {
state.ExitCode = xerr.Code
}
if cancelled.IsSet() {
state.ExitCode = 137
} else if state.ExitCode == 0 {
state.ExitCode = 1
}
}

log.Printf("pipeline: execution complete: %s", work.ID)

uploads.Wait()
err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err)
}

return nil
}
1 change: 1 addition & 0 deletions model/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Build struct {
Parent int `json:"parent" meddler:"build_parent"`
Event string `json:"event" meddler:"build_event"`
Status string `json:"status" meddler:"build_status"`
Error string `json:"error" meddler:"build_error"`
Enqueued int64 `json:"enqueued_at" meddler:"build_enqueued"`
Created int64 `json:"created_at" meddler:"build_created"`
Started int64 `json:"started_at" meddler:"build_started"`
Expand Down
Loading

0 comments on commit fe669b7

Please sign in to comment.