Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement jsonrpc2 agent behind feature flag #1954

Merged
merged 11 commits into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ workspace:

pipeline:
test:
image: golang:1.6
image: golang:1.8
environment:
- GO15VENDOREXPERIMENT=1
commands:
- make deps gen
- make test test_postgres test_mysql

compile:
image: golang:1.6
image: golang:1.8
environment:
- GO15VENDOREXPERIMENT=1
- GOPATH=/go
Expand Down
50 changes: 48 additions & 2 deletions drone/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"fmt"
"math"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -60,7 +62,7 @@ var AgentCmd = cli.Command{
Value: "amd64",
},
cli.StringFlag{
EnvVar: "DRONE_SERVER",
EnvVar: "DRONE_SERVER,DRONE_ENDPOINT",
Name: "drone-server",
Usage: "drone server address",
Value: "ws:https://localhost:8000/ws/broker",
Expand Down Expand Up @@ -138,11 +140,55 @@ var AgentCmd = cli.Command{
Name: "extension",
Usage: "custom plugin extension endpoint",
},

//
//
//

cli.BoolFlag{
EnvVar: "DRONE_CANARY",
Name: "canary",
Usage: "enable experimental features at your own risk",
},

// cli.StringFlag{
// Name: "endpoint",
// EnvVar: "DRONE_ENDPOINT,DRONE_SERVER",
// Value: "ws:https://localhost:9999/ws/rpc",
// },
// cli.DurationFlag{
// Name: "backoff",
// EnvVar: "DRONE_BACKOFF",
// Value: time.Second * 15,
// },
cli.IntFlag{
Name: "retry-limit",
EnvVar: "DRONE_RETRY_LIMIT",
Value: math.MaxInt32,
},
cli.IntFlag{
Name: "max-procs",
EnvVar: "DRONE_MAX_PROCS",
Value: 1,
},
cli.StringFlag{
Name: "platform",
EnvVar: "DRONE_PLATFORM",
Value: "linux/amd64",
},
},
}

func start(c *cli.Context) {

if c.Bool("canary") {
if err := loop(c); err != nil {
fmt.Println(err)
os.Exit(1)
}
return
}

log := redlog.New(os.Stderr)
log.SetLevel(0)
logger.SetLogger(log)
Expand Down Expand Up @@ -187,7 +233,7 @@ func start(c *cli.Context) {
client.Ack(m.Ack)
}()

r := pipeline{
r := pipelinet{
drone: client,
docker: docker,
config: config{
Expand Down
4 changes: 2 additions & 2 deletions drone/agent/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type config struct {
extension []string
}

type pipeline struct {
type pipelinet struct {
drone *stomp.Client
docker dockerclient.Client
config config
}

func (r *pipeline) run(w *model.Work) {
func (r *pipelinet) run(w *model.Work) {

// defer func() {
// // r.drone.Ack(id, opts)
Expand Down
206 changes: 206 additions & 0 deletions drone/agent/exp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package agent

import (
"context"
"io"
"log"
"net/url"
"sync"
"time"

"github.com/cncd/pipeline/pipeline"
"github.com/cncd/pipeline/pipeline/backend"
"github.com/cncd/pipeline/pipeline/backend/docker"
"github.com/cncd/pipeline/pipeline/interrupt"
"github.com/cncd/pipeline/pipeline/multipart"
"github.com/cncd/pipeline/pipeline/rpc"

"github.com/codegangsta/cli"
"github.com/tevino/abool"
)

func loop(c *cli.Context) error {
endpoint, err := url.Parse(
c.String("drone-server"),
)
if err != nil {
return err
}
filter := rpc.Filter{
Labels: map[string]string{
"platform": c.String("platform"),
},
}

client, err := rpc.NewClient(
endpoint.String(),
rpc.WithRetryLimit(
c.Int("retry-limit"),
),
rpc.WithBackoff(
c.Duration("backoff"),
),
rpc.WithToken(
c.String("drone-secret"),
),
)
if err != nil {
return err
}
defer client.Close()

sigterm := abool.New()
ctx := context.Background()
ctx = interrupt.WithContextFunc(ctx, func() {
println("ctrl+c received, terminating process")
sigterm.Set()
})

var wg sync.WaitGroup
parallel := c.Int("max-procs")
wg.Add(parallel)

for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
for {
if sigterm.IsSet() {
return
}
if err := run(ctx, client, filter); err != nil {
log.Printf("build runner encountered error: exiting: %s", err)
return
}
}
}()
}

wg.Wait()
return nil
}

const (
maxFileUpload = 5000000
maxLogsUpload = 5000000
)

func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
log.Println("pipeline: request next execution")

// get the next job from the queue
work, err := client.Next(ctx, filter)
if err != nil {
return err
}
if work == nil {
return nil
}
log.Printf("pipeline: received next execution: %s", work.ID)

// new docker engine
engine, err := docker.NewEnv()
if err != nil {
return err
}

timeout := time.Hour
if minutes := work.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cancelled := abool.New()
go func() {
if werr := client.Wait(ctx, work.ID); werr != nil {
cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr)
cancel()
} else {
log.Printf("pipeline: cancel channel closed: %s", work.ID)
}
}()

go func() {
for {
select {
case <-ctx.Done():
log.Printf("pipeline: cancel ping loop: %s", work.ID)
return
case <-time.After(time.Minute):
log.Printf("pipeline: ping queue: %s", work.ID)
client.Extend(ctx, work.ID)
}
}
}()

state := rpc.State{}
state.Started = time.Now().Unix()
err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err)
}

var uploads sync.WaitGroup
defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error {
part, rerr := rc.NextPart()
if rerr != nil {
return rerr
}
uploads.Add(1)
writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
rlimit := io.LimitReader(part, maxLogsUpload)
io.Copy(writer, rlimit)

defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
uploads.Done()
}()

part, rerr = rc.NextPart()
if rerr != nil {
return nil
}
rlimit = io.LimitReader(part, maxFileUpload)
mime := part.Header().Get("Content-Type")
if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr)
}
return nil
})

err = pipeline.New(work.Config,
pipeline.WithContext(ctx),
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithEngine(engine),
).Run()

state.Finished = time.Now().Unix()
state.Exited = true
if err != nil {
state.Error = err.Error()
if xerr, ok := err.(*pipeline.ExitError); ok {
state.ExitCode = xerr.Code
}
if xerr, ok := err.(*pipeline.OomError); ok {
state.ExitCode = xerr.Code
}
if cancelled.IsSet() {
state.ExitCode = 137
} else if state.ExitCode == 0 {
state.ExitCode = 1
}
}

log.Printf("pipeline: execution complete: %s", work.ID)

uploads.Wait()
err = client.Update(context.Background(), work.ID, state)
if err != nil {
log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err)
}

return nil
}
1 change: 1 addition & 0 deletions model/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Build struct {
Parent int `json:"parent" meddler:"build_parent"`
Event string `json:"event" meddler:"build_event"`
Status string `json:"status" meddler:"build_status"`
Error string `json:"error" meddler:"build_error"`
Enqueued int64 `json:"enqueued_at" meddler:"build_enqueued"`
Created int64 `json:"created_at" meddler:"build_created"`
Started int64 `json:"started_at" meddler:"build_started"`
Expand Down
Loading