Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(cli): Migrate argo logs to use API client. See #2116 #2177

Merged
merged 33 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cc26a3b
chore: Migrate `argo logs` to use API client. See #2116
alexec Feb 5, 2020
664e33a
changes
alexec Feb 5, 2020
b52d20a
fix: Correctly create code from changed protos.
alexec Feb 5, 2020
12eb78e
Merge branch 'codegen' into cli-logs
alexec Feb 5, 2020
587e36d
lint
alexec Feb 5, 2020
b4d49dc
Merge branch 'master' into cli-logs
alexec Feb 6, 2020
75504ed
codegen
alexec Feb 6, 2020
a4b92a0
make codegen
alexec Feb 6, 2020
e379903
logs
alexec Feb 6, 2020
2ab5dde
changes
alexec Feb 6, 2020
0d40d96
oddloading
alexec Feb 6, 2020
cab847e
correct diagnostic output
alexec Feb 6, 2020
271eb45
hopefully the final changes
alexec Feb 6, 2020
7ed136f
changes
alexec Feb 6, 2020
7168d34
Merge branch 'master' into cli-logs
alexec Feb 7, 2020
52e85da
Merge branch 'master' into cli-logs
alexec Feb 10, 2020
88fd4ad
feat(cli-logs): fix merge + make lint
alexec Feb 10, 2020
9420827
lint
alexec Feb 10, 2020
b589a73
Merge branch 'master' into cli-logs
alexec Feb 11, 2020
1f416ff
tidy up
alexec Feb 11, 2020
8078538
Merge branch 'master' into cli-logs
alexec Feb 20, 2020
67e4f53
codegen
alexec Feb 20, 2020
1c23aa4
test(cli): fix test
alexec Feb 20, 2020
50d67aa
Merge branch 'master' into cli-logs
alexec Feb 20, 2020
ecaf440
logs
alexec Feb 21, 2020
5fed826
reinstate vendor
alexec Feb 21, 2020
1ed093a
Merge branch 'master' into cli-logs
alexec Feb 22, 2020
78eeb15
changes
alexec Feb 22, 2020
802c290
Merge branch 'master' into cli-logs
alexec Feb 24, 2020
d868351
interm-2
alexec Feb 24, 2020
67c4ff9
copylocks lint
alexec Feb 24, 2020
0594ddd
Merge branch 'master' into cli-logs
alexec Feb 25, 2020
0ec7417
Merge branch 'master' into cli-logs
alexec Feb 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
interm-2
  • Loading branch information
alexec committed Feb 24, 2020
commit d868351d235d21d65b822c3617bb12097e141acd
27 changes: 27 additions & 0 deletions pkg/apiclient/abstract-intermediary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package apiclient

import (
"context"
)

type abstractIntermediary struct {
panicIntermediary
ctx context.Context
cancel context.CancelFunc
// if anything is on this channel, then then we must be done - the error maybe io.EOF - which just means stop
error chan error
}

func (w abstractIntermediary) Context() context.Context {
return w.ctx
}

func newAbstractIntermediary(ctx context.Context) abstractIntermediary {
ctx, cancel := context.WithCancel(ctx)
return abstractIntermediary{
panicIntermediary: panicIntermediary{},
ctx: ctx,
cancel: cancel,
error: make(chan error, 1),
}
}
4 changes: 3 additions & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/argoproj/argo/util/help"
)

var argoKubeOffloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo

type argoKubeClient struct {
}

Expand All @@ -45,7 +47,7 @@ func newArgoKubeClient(clientConfig clientcmd.ClientConfig) (context.Context, Cl
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(sqldb.ExplosiveOffloadNodeStatusRepo)}
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(argoKubeOffloadNodeStatusRepo)}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() cronworkflow.CronWorkflowServiceClient {
Expand Down
14 changes: 10 additions & 4 deletions pkg/apiclient/argo-kube-workflow-service-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apiclient

import (
"context"
"io"

"google.golang.org/grpc"

Expand Down Expand Up @@ -59,9 +60,14 @@ func (c argoKubeWorkflowServiceClient) LintWorkflow(ctx context.Context, req *wo

func (c argoKubeWorkflowServiceClient) PodLogs(ctx context.Context, req *workflowpkg.WorkflowLogRequest, _ ...grpc.CallOption) (workflowpkg.WorkflowService_PodLogsClient, error) {
intermediary := newLogsIntermediary(ctx)
err := c.delegate.PodLogs(req, intermediary)
if err != nil {
return nil, err
}
go func() {
defer intermediary.cancel()
err := c.delegate.PodLogs(req, intermediary)
if err != nil {
intermediary.error <- err
} else {
intermediary.error <- io.EOF
}
}()
return intermediary, nil
}
56 changes: 7 additions & 49 deletions pkg/apiclient/logs-intermediary.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package apiclient

import (
"context"
"io"

"google.golang.org/grpc/metadata"

workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
)

// The "Poison pill pattern" to tell the channel to close.
var closeTheLogEntriesChan *workflowpkg.LogEntry

type logsIntermediary struct {
alexec marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context
abstractIntermediary
logEntries chan *workflowpkg.LogEntry
}

Expand All @@ -23,50 +17,14 @@ func (c *logsIntermediary) Send(logEntry *workflowpkg.LogEntry) error {
}

func (c *logsIntermediary) Recv() (*workflowpkg.LogEntry, error) {
logEntry := <-c.logEntries
if logEntry == closeTheLogEntriesChan {
return nil, io.EOF
select {
case err := <-c.error:
return nil, err
case logEntry := <-c.logEntries:
return logEntry, nil
}
return logEntry, nil
}

func newLogsIntermediary(ctx context.Context) *logsIntermediary {
return &logsIntermediary{ctx, make(chan *workflowpkg.LogEntry, 512)}
}

func (c *logsIntermediary) SetHeader(metadata.MD) error {
panic("implement me")
}

func (c *logsIntermediary) SendHeader(metadata.MD) error {
panic("implement me")
}

func (c *logsIntermediary) SetTrailer(metadata.MD) {
panic("implement me")
}

func (c *logsIntermediary) Header() (metadata.MD, error) {
panic("implement me")
}

func (c *logsIntermediary) Trailer() metadata.MD {
panic("implement me")
}

func (c *logsIntermediary) CloseSend() error {
c.logEntries <- closeTheLogEntriesChan
return nil
}

func (c *logsIntermediary) Context() context.Context {
return c.ctx
}

func (c *logsIntermediary) SendMsg(interface{}) error {
panic("implement me")
}

func (c *logsIntermediary) RecvMsg(interface{}) error {
panic("implement me")
return &logsIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.LogEntry, 64)}
}
38 changes: 38 additions & 0 deletions pkg/apiclient/panic-intermediary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package apiclient

import "google.golang.org/grpc/metadata"

type panicIntermediary struct {
}

func (w abstractIntermediary) Header() (metadata.MD, error) {
panic("implement me")
}

func (w abstractIntermediary) Trailer() metadata.MD {
panic("implement me")
}

func (w abstractIntermediary) CloseSend() error {
panic("implement me")
}

func (w abstractIntermediary) SendMsg(interface{}) error {
panic("implement me")
}

func (w abstractIntermediary) RecvMsg(interface{}) error {
panic("implement me")
}

func (w abstractIntermediary) SetHeader(metadata.MD) error {
panic("implement me")
}

func (w abstractIntermediary) SendHeader(metadata.MD) error {
panic("implement me")
}

func (w abstractIntermediary) SetTrailer(metadata.MD) {
panic("implement me")
}
22 changes: 13 additions & 9 deletions util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"reflect"
"sort"
"sync"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -38,15 +39,14 @@ type sender interface {
}

type workflowLogger struct {
// initial pods
pods []corev1.Pod
logCtx *log.Entry
completed bool
follow bool
wg sync.WaitGroup
initialPods []corev1.Pod
ensureWeAreStreaming func(pod *corev1.Pod)
podWatch watch.Interface
wg *sync.WaitGroup
wfWatch watch.Interface
completed bool
follow bool
}

func NewWorkflowLogger(ctx context.Context, wfClient versioned.Interface, kubeClient kubernetes.Interface, req request, sender sender) (WorkflowLogger, error) {
Expand Down Expand Up @@ -99,7 +99,6 @@ func NewWorkflowLogger(ctx context.Context, wfClient versioned.Interface, kubeCl
defer func() {
streamedPodsGuard.Lock()
defer streamedPodsGuard.Unlock()
streamedPods[pod.UID] = false
logCtx.Debug("Pod logs stream done")
}()
stream, err := podInterface.GetLogs(podName, req.GetLogOptions()).Stream()
Expand Down Expand Up @@ -142,14 +141,14 @@ func NewWorkflowLogger(ctx context.Context, wfClient versioned.Interface, kubeCl
}

return &workflowLogger{
pods: list.Items,
logCtx: logCtx,
initialPods: list.Items,
completed: completed,
follow: req.GetLogOptions().Follow,
wg: wg,
alexec marked this conversation as resolved.
Show resolved Hide resolved
ensureWeAreStreaming: ensureWeAreStreaming,
wfWatch: wfWatch,
podWatch: podWatch,
wg: &wg,
}, nil
}

Expand All @@ -159,7 +158,12 @@ func (l *workflowLogger) Run(ctx context.Context) {

l.logCtx.WithFields(log.Fields{"completed": l.completed, "follow": l.follow}).Debug("Running")

for _, pod := range l.pods {
// print logs by start time-ish
sort.Slice(l.initialPods, func(i, j int) bool {
return l.initialPods[i].Status.StartTime.Before(l.initialPods[j].Status.StartTime)
})

for _, pod := range l.initialPods {
l.ensureWeAreStreaming(&pod)
}

Expand Down