Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(cli): Migrate argo logs to use API client. See #2116 #2177

Merged
merged 33 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
cc26a3b
chore: Migrate `argo logs` to use API client. See #2116
alexec Feb 5, 2020
664e33a
changes
alexec Feb 5, 2020
b52d20a
fix: Correctly create code from changed protos.
alexec Feb 5, 2020
12eb78e
Merge branch 'codegen' into cli-logs
alexec Feb 5, 2020
587e36d
lint
alexec Feb 5, 2020
b4d49dc
Merge branch 'master' into cli-logs
alexec Feb 6, 2020
75504ed
codegen
alexec Feb 6, 2020
a4b92a0
make codegen
alexec Feb 6, 2020
e379903
logs
alexec Feb 6, 2020
2ab5dde
changes
alexec Feb 6, 2020
0d40d96
oddloading
alexec Feb 6, 2020
cab847e
correct diagnostic output
alexec Feb 6, 2020
271eb45
hopefully the final changes
alexec Feb 6, 2020
7ed136f
changes
alexec Feb 6, 2020
7168d34
Merge branch 'master' into cli-logs
alexec Feb 7, 2020
52e85da
Merge branch 'master' into cli-logs
alexec Feb 10, 2020
88fd4ad
feat(cli-logs): fix merge + make lint
alexec Feb 10, 2020
9420827
lint
alexec Feb 10, 2020
b589a73
Merge branch 'master' into cli-logs
alexec Feb 11, 2020
1f416ff
tidy up
alexec Feb 11, 2020
8078538
Merge branch 'master' into cli-logs
alexec Feb 20, 2020
67e4f53
codegen
alexec Feb 20, 2020
1c23aa4
test(cli): fix test
alexec Feb 20, 2020
50d67aa
Merge branch 'master' into cli-logs
alexec Feb 20, 2020
ecaf440
logs
alexec Feb 21, 2020
5fed826
reinstate vendor
alexec Feb 21, 2020
1ed093a
Merge branch 'master' into cli-logs
alexec Feb 22, 2020
78eeb15
changes
alexec Feb 22, 2020
802c290
Merge branch 'master' into cli-logs
alexec Feb 24, 2020
d868351
interm-2
alexec Feb 24, 2020
67c4ff9
copylocks lint
alexec Feb 24, 2020
0594ddd
Merge branch 'master' into cli-logs
alexec Feb 25, 2020
0ec7417
Merge branch 'master' into cli-logs
alexec Feb 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes
  • Loading branch information
alexec committed Feb 6, 2020
commit 2ab5dde6b6199760df8772aeb79605de42c0ccef
3 changes: 3 additions & 0 deletions cmd/argo/commands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ func NewLogsCommand() *cobra.Command {
}

if since > 0 {
// TODO - need to test with since
logOptions.SinceSeconds = pointer.Int64Ptr(int64(since.Seconds()))
}

if sinceTime != "" {
// TODO - need to test with time
parsedTime, err := time.Parse(time.RFC3339, sinceTime)
errors.CheckError(err)
sinceTime := metav1.NewTime(parsedTime)
Expand All @@ -73,6 +75,7 @@ func NewLogsCommand() *cobra.Command {
// TODO - need to test with deleted pods

if tailLines >= 0 {
// TODO - need to test with zero lines
logOptions.TailLines = pointer.Int64Ptr(tailLines)
}

Expand Down
122 changes: 84 additions & 38 deletions util/logs/pod-logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,97 @@ import (
"bufio"
"context"
"fmt"
"sync"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"

workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
"github.com/argoproj/argo/workflow/common"
)

func PodLogs(ctx context.Context, kubeClient kubernetes.Interface, req *workflowpkg.WorkflowLogRequest, ws workflowpkg.WorkflowService_PodLogsServer) error {
type Request interface {
GetNamespace() string
GetName() string
GetPodName() string
GetLogOptions() *corev1.PodLogOptions
}

type Sender interface {
Send(entry *workflowpkg.LogEntry) error
}

// This function ensure you're logging the output from pods for the workflow.
// This includes existing pods, but it also watches for new pods.
// Notes:
// * It does not check to see if your workflow exists of if you have read permission on it (it does not need to).
// * It must assume you don't know if a pod might appear or not, so it intentionally does not check
// to see if the pod exists. This means it may wait forever if you have made a spelling mistake.
func PodLogs(ctx context.Context, kubeClient kubernetes.Interface, req Request, sender Sender) error {
podInterface := kubeClient.CoreV1().Pods(req.GetNamespace())

logCtx := log.WithFields(log.Fields{"workflow": req.Name, "namespace": req.Namespace})
logCtx := log.WithFields(log.Fields{"workflow": req.GetName(), "namespace": req.GetNamespace()})

// we create a watch on the pods labelled with the workflow name,
// but we also filter by pod name if that was requested
options := metav1.ListOptions{LabelSelector: common.LabelKeyWorkflow + "=" + req.Name}
if req.PodName != "" {
options.FieldSelector = "metadata.name=" + req.PodName
options := metav1.ListOptions{LabelSelector: common.LabelKeyWorkflow + "=" + req.GetName()}
if req.GetPodName() != "" {
options.FieldSelector = "metadata.name=" + req.GetPodName()
}

// keep a track of those we are logging
streamedPods := make(map[types.UID]bool)
var streamedPodsGuard sync.Mutex

ensureWeAreStreaming := func(pod *corev1.Pod) {
streamedPodsGuard.Lock()
defer streamedPodsGuard.Unlock()
logCtx.WithFields(log.Fields{"podPhase": pod.Status.Phase, "alreadyStreaming": streamedPods[pod.UID]}).Debug("Ensuring watch")
if pod.Status.Phase != corev1.PodPending && !streamedPods[pod.UID] {
streamedPods[pod.UID] = true
go func(podName string) {
defer func() {
streamedPodsGuard.Lock()
defer streamedPodsGuard.Unlock()
streamedPods[pod.UID] = false
logCtx.Debug("Stopping streaming")
}()
stream, err := podInterface.GetLogs(podName, req.GetLogOptions()).Stream()
if err != nil {
logCtx.WithField("err", err).Error("Unable to get logs")
return
}
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
select {
case <-ctx.Done():
logCtx.Debug("Done")
return
default:
content := scanner.Text()
logCtx.WithField("content", content).Debug("Log line")
// we actually don't know the container name AFAIK
err = sender.Send(&workflowpkg.LogEntry{PodName: podName, Content: content})
if err != nil {
logCtx.WithField("err", err).Error("Unable to send log entry")
return
}
}
}
logCtx.Debug("No more data")
// out of data, we do not want to start watching again
}(pod.GetName())
}
}

logCtx.Debug("Watching for known pods")
list, err := podInterface.List(options)
if err != nil {
return err
}

w, err := podInterface.Watch(options)
Expand All @@ -33,9 +103,13 @@ func PodLogs(ctx context.Context, kubeClient kubernetes.Interface, req *workflow
}
defer w.Stop()

logCtx.Debug("Watching for pods")
// no errors are returned from this point forward as we will block using a select

for _, pod := range list.Items {
ensureWeAreStreaming(&pod)
}

podsWeAreLogging := make(map[string]bool)
logCtx.Debug("Watching for pod events")

for {
select {
Expand All @@ -47,38 +121,10 @@ func PodLogs(ctx context.Context, kubeClient kubernetes.Interface, req *workflow
if !ok {
return fmt.Errorf("watch object was not a workflow %v", pod.GroupVersionKind())
}
logCtx := logCtx.WithField("podName", pod.GetName())
logCtx.WithFields(log.Fields{"eventType": event.Type, "podPhase": pod.Status.Phase, "alreadyLogging": podsWeAreLogging[pod.Name]}).Debug("Event")
logCtx.WithFields(log.Fields{"eventType": event.Type, "podName": pod.GetName()}).Debug("Event")
// whenever a new pod appears, we start a goroutine to watch it
if event.Type != watch.Deleted && pod.Status.Phase != corev1.PodPending && !podsWeAreLogging[pod.Name] {
podsWeAreLogging[pod.Name] = true
go func(podName string) {
stream, err := podInterface.GetLogs(podName, req.LogOptions).Stream()
if err != nil {
podsWeAreLogging[pod.Name] = false
logCtx.WithField("err", err).Error("Unable to get logs")
return
}
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
select {
case <-ctx.Done():
podsWeAreLogging[pod.Name] = false
logCtx.Debug("Done")
return
default:
content := scanner.Text()
logCtx.WithField("content", content).Debug("Log line")
// we actually don't know the container name AFAIK
err = ws.Send(&workflowpkg.LogEntry{PodName: podName, Content: content})
if err != nil {
podsWeAreLogging[pod.Name] = false
logCtx.WithField("err", err).Error("Unable to send log entry")
return
}
}
}
}(pod.GetName())
if event.Type != watch.Deleted {
ensureWeAreStreaming(pod)
}
}
}
Expand Down