Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement jsonrpc2 agent behind feature flag #1954

Merged
merged 11 commits into from
Mar 15, 2017
Prev Previous commit
Next Next commit
mask secrets, backport drone_ variables
  • Loading branch information
bradrydzewski committed Mar 12, 2017
commit 442e05a4e129b7c669efde51080e5d03704ba717
26 changes: 19 additions & 7 deletions drone/agent/exp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func loop(c *cli.Context) error {
if err != nil {
return err
}
filter := rpc.Filter{
Labels: map[string]string{
"platform": c.String("platform"),
},
}

client, err := rpc.NewClient(
endpoint.String(),
Expand Down Expand Up @@ -62,7 +67,7 @@ func loop(c *cli.Context) error {
if sigterm.IsSet() {
return
}
if err := run(ctx, client); err != nil {
if err := run(ctx, client, filter); err != nil {
log.Printf("build runner encountered error: exiting: %s", err)
return
}
Expand All @@ -74,11 +79,16 @@ func loop(c *cli.Context) error {
return nil
}

func run(ctx context.Context, client rpc.Peer) error {
const (
maxFileUpload = 5000000
maxLogsUpload = 5000000
)

func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
log.Println("pipeline: request next execution")

// get the next job from the queue
work, err := client.Next(ctx)
work, err := client.Next(ctx, filter)
if err != nil {
return err
}
Expand All @@ -103,9 +113,9 @@ func run(ctx context.Context, client rpc.Peer) error {

cancelled := abool.New()
go func() {
if err := client.Wait(ctx, work.ID); err != nil {
if werr := client.Wait(ctx, work.ID); err != nil {
cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, err)
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr)
cancel()
} else {
log.Printf("pipeline: cancel channel closed: %s", work.ID)
Expand Down Expand Up @@ -140,7 +150,8 @@ func run(ctx context.Context, client rpc.Peer) error {
}
uploads.Add(1)
writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
io.Copy(writer, part)
rlimit := io.LimitReader(part, maxLogsUpload)
io.Copy(writer, rlimit)

defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
Expand All @@ -151,8 +162,9 @@ func run(ctx context.Context, client rpc.Peer) error {
if rerr != nil {
return nil
}
rlimit = io.LimitReader(part, maxFileUpload)
mime := part.Header().Get("Content-Type")
if serr := client.Save(context.Background(), work.ID, mime, part); serr != nil {
if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr)
}
return nil
Expand Down
40 changes: 35 additions & 5 deletions server/hook2.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func PostHook2(c *gin.Context) {

for _, job := range jobs {

metadata := metadataFromStruct(repo, build, last, job, "linux/amd64")
metadata := metadataFromStruct(repo, build, last, job, httputil.GetURL(c.Request))
environ := metadata.Environ()

secrets := map[string]string{}
Expand Down Expand Up @@ -296,6 +296,7 @@ func PostHook2(c *gin.Context) {

ir := compiler.New(
compiler.WithEnviron(environ),
// TODO ability to customize the escalated plugins
compiler.WithEscalated("plugins/docker", "plugins/gcr", "plugins/ecr"),
compiler.WithLocal(false),
compiler.WithNetrc(netrc.Login, netrc.Password, netrc.Machine),
Expand All @@ -306,17 +307,46 @@ func PostHook2(c *gin.Context) {
time.Now().Unix(),
),
),
compiler.WithEnviron(job.Environment),
compiler.WithProxy(),
compiler.WithVolumes(), // todo set global volumes
// TODO ability to set global volumes for things like certs
compiler.WithVolumes(),
compiler.WithWorkspaceFromURL("/drone", repo.Link),
).Compile(parsed)

// TODO there is a chicken and egg problem here because
// the compiled yaml has a platform environment variable
// that is not correctly set, because we are just about
// to set it ....
// TODO maybe we remove platform from metadata and let
// the compiler set the value from the yaml itself.
if parsed.Platform == "" {
parsed.Platform = "linux/amd64"
}

for _, sec := range secs {
if !sec.MatchEvent(build.Event) {
continue
}
if build.Verified || sec.SkipVerify {
ir.Secrets = append(ir.Secrets, &backend.Secret{
Mask: sec.Conceal,
Name: sec.Name,
Value: sec.Value,
})
}
}

task := new(queue.Task)
task.ID = fmt.Sprint(job.ID)
task.Labels = map[string]string{}
task.Labels["platform"] = "linux/amd64"
// TODO set proper platform
// TODO set proper labels
task.Labels["platform"] = parsed.Platform
if parsed.Labels != nil {
for k, v := range parsed.Labels {
task.Labels[k] = v
}
}

task.Data, _ = json.Marshal(rpc.Pipeline{
ID: fmt.Sprint(job.ID),
Config: ir,
Expand Down
17 changes: 12 additions & 5 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,16 @@ type RPC struct {
}

// Next implements the rpc.Next function
func (s *RPC) Next(c context.Context) (*rpc.Pipeline, error) {
filter := func(*queue.Task) bool { return true }
task, err := s.queue.Poll(c, filter)
func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) {
fn := func(task *queue.Task) bool {
for k, v := range filter.Labels {
if task.Labels[k] != v {
return false
}
}
return true
}
task, err := s.queue.Poll(c, fn)
if err != nil {
return nil, err
} else if task == nil {
Expand Down Expand Up @@ -207,8 +214,8 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
return nil
}

// Save implements the rpc.Save function
func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil }
// Upload implements the rpc.Upload function
func (s *RPC) Upload(c context.Context, id, mime string, file io.Reader) error { return nil }

// Done implements the rpc.Done function
func (s *RPC) Done(c context.Context, id string) error { return nil }
Expand Down
9 changes: 9 additions & 0 deletions vendor/github.com/cncd/pipeline/pipeline/backend/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 97 additions & 10 deletions vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading