Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement jsonrpc2 agent behind feature flag #1954

Merged
merged 11 commits into from
Mar 15, 2017
Prev Previous commit
Next Next commit
enable restart
  • Loading branch information
bradrydzewski committed Mar 14, 2017
commit 395f0d52f9f1dc020fbafff10773ee5b9a11399f
2 changes: 1 addition & 1 deletion drone/agent/exp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {

cancelled := abool.New()
go func() {
if werr := client.Wait(ctx, work.ID); err != nil {
if werr := client.Wait(ctx, work.ID); werr != nil {
cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr)
cancel()
Expand Down
221 changes: 221 additions & 0 deletions server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -11,6 +12,8 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/cncd/pipeline/pipeline/rpc"
"github.com/cncd/pubsub"
"github.com/cncd/queue"
"github.com/drone/drone/remote"
"github.com/drone/drone/shared/httputil"
Expand Down Expand Up @@ -169,6 +172,11 @@ func DeleteBuild(c *gin.Context) {

func PostBuild(c *gin.Context) {

if os.Getenv("DRONE_CANARY") == "true" {
PostBuild2(c)
return
}

remote_ := remote.FromContext(c)
repo := session.Repo(c)
fork := c.DefaultQuery("fork", "false")
Expand Down Expand Up @@ -282,6 +290,7 @@ func PostBuild(c *gin.Context) {
build.Started = 0
build.Finished = 0
build.Enqueued = time.Now().UTC().Unix()
build.Error = ""
for _, job := range jobs {
for k, v := range buildParams {
job.Environment[k] = v
Expand Down Expand Up @@ -395,3 +404,215 @@ func copyLogs(dest io.Writer, src io.Reader) error {

return nil
}

//
//
//
//
//
//

func PostBuild2(c *gin.Context) {

remote_ := remote.FromContext(c)
repo := session.Repo(c)
fork := c.DefaultQuery("fork", "false")

num, err := strconv.Atoi(c.Param("number"))
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}

user, err := store.GetUser(c, repo.UserID)
if err != nil {
log.Errorf("failure to find repo owner %s. %s", repo.FullName, err)
c.AbortWithError(500, err)
return
}

build, err := store.GetBuildNumber(c, repo, num)
if err != nil {
log.Errorf("failure to get build %d. %s", num, err)
c.AbortWithError(404, err)
return
}

// if the remote has a refresh token, the current access token
// may be stale. Therefore, we should refresh prior to dispatching
// the job.
if refresher, ok := remote_.(remote.Refresher); ok {
ok, _ := refresher.Refresh(user)
if ok {
store.UpdateUser(c, user)
}
}

// fetch the .drone.yml file from the database
cfg := ToConfig(c)
raw, err := remote_.File(user, repo, build, cfg.Yaml)
if err != nil {
log.Errorf("failure to get build config for %s. %s", repo.FullName, err)
c.AbortWithError(404, err)
return
}

netrc, err := remote_.Netrc(user, repo)
if err != nil {
log.Errorf("failure to generate netrc for %s. %s", repo.FullName, err)
c.AbortWithError(500, err)
return
}

jobs, err := store.GetJobList(c, build)
if err != nil {
log.Errorf("failure to get build %d jobs. %s", build.Number, err)
c.AbortWithError(404, err)
return
}

// must not restart a running build
if build.Status == model.StatusPending || build.Status == model.StatusRunning {
c.String(409, "Cannot re-start a started build")
return
}

// forking the build creates a duplicate of the build
// and then executes. This retains prior build history.
if forkit, _ := strconv.ParseBool(fork); forkit {
build.ID = 0
build.Number = 0
build.Parent = num
for _, job := range jobs {
job.ID = 0
job.NodeID = 0
}
err := store.CreateBuild(c, build, jobs...)
if err != nil {
c.String(500, err.Error())
return
}

event := c.DefaultQuery("event", build.Event)
if event == model.EventPush ||
event == model.EventPull ||
event == model.EventTag ||
event == model.EventDeploy {
build.Event = event
}
build.Deploy = c.DefaultQuery("deploy_to", build.Deploy)
}

// Read query string parameters into buildParams, exclude reserved params
var buildParams = map[string]string{}
for key, val := range c.Request.URL.Query() {
switch key {
case "fork", "event", "deploy_to":
default:
// We only accept string literals, because build parameters will be
// injected as environment variables
buildParams[key] = val[0]
}
}

// todo move this to database tier
// and wrap inside a transaction
build.Status = model.StatusPending
build.Started = 0
build.Finished = 0
build.Enqueued = time.Now().UTC().Unix()
build.Error = ""
for _, job := range jobs {
for k, v := range buildParams {
job.Environment[k] = v
}
job.Error = ""
job.Status = model.StatusPending
job.Started = 0
job.Finished = 0
job.ExitCode = 0
job.NodeID = 0
job.Enqueued = build.Enqueued
store.UpdateJob(c, job)
}

err = store.UpdateBuild(c, build)
if err != nil {
c.AbortWithStatus(500)
return
}

c.JSON(202, build)

// get the previous build so that we can send
// on status change notifications
last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID)
secs, err := store.GetMergedSecretList(c, repo)
if err != nil {
log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err)
}

b := builder{
Repo: repo,
Curr: build,
Last: last,
Netrc: netrc,
Secs: secs,
Link: httputil.GetURL(c.Request),
Yaml: string(raw),
}
items, err := b.Build()
if err != nil {
build.Status = model.StatusError
build.Started = time.Now().Unix()
build.Finished = build.Started
build.Error = err.Error()
return
}

for i, item := range items {
// TODO prevent possible index out of bounds
item.Job.ID = jobs[i].ID
build.Jobs = append(build.Jobs, item.Job)
store.UpdateJob(c, item.Job)
}

//
// publish topic
//
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsPrivate),
},
}
message.Data, _ = json.Marshal(model.Event{
Type: model.Enqueued,
Repo: *repo,
Build: *build,
})
// TODO remove global reference
config.pubsub.Publish(c, "topic/events", message)
//
// end publish topic
//

for _, item := range items {
task := new(queue.Task)
task.ID = fmt.Sprint(item.Job.ID)
task.Labels = map[string]string{}
task.Labels["platform"] = item.Platform
for k, v := range item.Labels {
task.Labels[k] = v
}

task.Data, _ = json.Marshal(rpc.Pipeline{
ID: fmt.Sprint(item.Job.ID),
Config: item.Config,
Timeout: b.Repo.Timeout,
})

config.logger.Open(context.Background(), task.ID)
config.queue.Push(context.Background(), task)
}
}
Loading