Skip to content

Commit

Permalink
Introduce sirupsen/logrus logging package
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Oct 27, 2017
1 parent 2058342 commit 63a2c20
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 40 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions workflow/client/cr.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package client

import (
"fmt"
"time"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
log "github.com/sirupsen/logrus"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -15,7 +15,7 @@ import (
)

func CreateCustomResourceDefinition(clientset apiextensionsclient.Interface) (*apiextensionsv1beta1.CustomResourceDefinition, error) {
fmt.Printf("Creating Workflow CRD\n")
log.Infof("Creating Workflow CRD")
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: wfv1.CRDFullName,
Expand Down Expand Up @@ -51,7 +51,7 @@ func CreateCustomResourceDefinition(clientset apiextensionsclient.Interface) (*a
}
case apiextensionsv1beta1.NamesAccepted:
if cond.Status == apiextensionsv1beta1.ConditionFalse {
fmt.Printf("Name conflict: %v\n", cond.Reason)
log.Errorf("Name conflict: %v", cond.Reason)
}
}
}
Expand Down
37 changes: 18 additions & 19 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package controller

import (
"context"
"fmt"
"log"

"github.com/argoproj/argo"
wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
workflowclient "github.com/argoproj/argo/workflow/client"
"github.com/argoproj/argo/workflow/common"
"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -79,58 +78,58 @@ func NewWorkflowController(config *rest.Config, configMap string) *WorkflowContr

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context) error {
fmt.Print("Watch Workflow objects\n")
log.Info("Watch Workflow objects")

// Watch Workflow objects
_, err := wfc.watchWorkflows(ctx)
if err != nil {
fmt.Printf("Failed to register watch for Workflow resource: %v\n", err)
log.Errorf("Failed to register watch for Workflow resource: %v", err)
return err
}

// Watch pods related to workflows
_, err = wfc.watchWorkflowPods(ctx)
if err != nil {
fmt.Printf("Failed to register watch for Workflow resource: %v\n", err)
log.Errorf("Failed to register watch for Workflow resource: %v", err)
return err
}

for {
select {
case wf := <-wfc.wfUpdates:
fmt.Printf("Processing wf: %v\n", wf.ObjectMeta.SelfLink)
log.Infof("Processing wf: %v", wf.ObjectMeta.SelfLink)
wfc.operateWorkflow(wf)
case pod := <-wfc.podUpdates:
if pod.Status.Phase != "Succeeded" && pod.Status.Phase != "Failed" {
continue
}
fmt.Printf("Processing completed pod: %v\n", pod.ObjectMeta.SelfLink)
log.Infof("Processing completed pod: %v", pod.ObjectMeta.SelfLink)
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
continue
}
wf, err := wfc.WorkflowClient.GetWorkflow(workflowName)
if err != nil {
fmt.Printf("Failed to find workflow %s %+v\n", workflowName, err)
log.Warnf("Failed to find workflow %s %+v", workflowName, err)
continue
}
node, ok := wf.Status.Nodes[pod.Name]
if !ok {
fmt.Printf("pod %s unassociated with workflow %s", pod.Name, workflowName)
log.Warnf("pod %s unassociated with workflow %s", pod.Name, workflowName)
continue
}
if string(pod.Status.Phase) == node.Status {
fmt.Printf("pod %s already marked %s\n", pod.Name, node.Status)
log.Infof("pod %s already marked %s", pod.Name, node.Status)
continue
}
fmt.Printf("Updating pod %s status %s -> %s\n", pod.Name, node.Status, pod.Status.Phase)
log.Infof("Updating pod %s status %s -> %s", pod.Name, node.Status, pod.Status.Phase)
node.Status = string(pod.Status.Phase)
wf.Status.Nodes[pod.Name] = node
_, err = wfc.WorkflowClient.UpdateWorkflow(wf)
if err != nil {
fmt.Printf("Failed to update %s status: %+v\n", pod.Name, err)
log.Infof("Failed to update %s status: %+v", pod.Name, err)
}
fmt.Printf("Updated %v\n", wf.Status.Nodes)
log.Infof("Updated %v", wf.Status.Nodes)
}
}

Expand Down Expand Up @@ -201,18 +200,18 @@ func (wfc *WorkflowController) watchWorkflows(ctx context.Context) (cache.Contro
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
wf := obj.(*wfv1.Workflow)
fmt.Printf("[CONTROLLER] WF Add %s\n", wf.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] WF Add %s", wf.ObjectMeta.SelfLink)
wfc.wfUpdates <- wf
},
UpdateFunc: func(old, new interface{}) {
//oldWf := old.(*wfv1.Workflow)
newWf := new.(*wfv1.Workflow)
fmt.Printf("[CONTROLLER] WF Update %s\n", newWf.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] WF Update %s", newWf.ObjectMeta.SelfLink)
wfc.wfUpdates <- newWf
},
DeleteFunc: func(obj interface{}) {
wf := obj.(*wfv1.Workflow)
fmt.Printf("[CONTROLLER] WF Delete %s\n", wf.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] WF Delete %s", wf.ObjectMeta.SelfLink)
wfc.wfUpdates <- wf
},
})
Expand Down Expand Up @@ -244,18 +243,18 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*apiv1.Pod)
fmt.Printf("[CONTROLLER] Pod Added%s\n", pod.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] Pod Added%s", pod.ObjectMeta.SelfLink)
wfc.podUpdates <- pod
},
UpdateFunc: func(old, new interface{}) {
//oldPod := old.(*apiv1.Pod)
newPod := new.(*apiv1.Pod)
fmt.Printf("[CONTROLLER] Pod Updated %s\n", newPod.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] Pod Updated %s", newPod.ObjectMeta.SelfLink)
wfc.podUpdates <- newPod
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*apiv1.Pod)
fmt.Printf("[CONTROLLER] Pod Deleted%s\n", pod.ObjectMeta.SelfLink)
log.Infof("[CONTROLLER] Pod Deleted%s", pod.ObjectMeta.SelfLink)
wfc.podUpdates <- pod
},
})
Expand Down
27 changes: 14 additions & 13 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
)

// operateWorkflow is the operator logic of a workflow
Expand All @@ -23,9 +24,9 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
if updated {
_, err := wfc.WorkflowClient.UpdateWorkflow(wfCopy)
if err != nil {
fmt.Printf("ERROR updating status: %v\n", err)
log.Errorf("ERROR updating status: %v", err)
} else {
fmt.Printf("UPDATED %s: %#v\n", wfCopy.ObjectMeta.Name, wfCopy.Status)
log.Infof("UPDATED %s: %#v", wfCopy.ObjectMeta.Name, wfCopy.Status)
}
}
}()
Expand All @@ -37,17 +38,17 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
tmplUpdates, err := wfc.executeTemplate(wfCopy, wfCopy.Spec.Entrypoint, nil, wfCopy.ObjectMeta.Name)
updated = updated || tmplUpdates
if err != nil {
fmt.Printf("%s error: %+v\n", wf.ObjectMeta.Name, err)
log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
}
}

// Returns tuple of: (workflow was updated, error)
func (wfc *WorkflowController) executeTemplate(wf *wfv1.Workflow, templateName string, args *wfv1.Arguments, nodeName string) (bool, error) {
fmt.Printf("Evaluating node %s: %v, args: %#v\n", nodeName, templateName, args)
log.Infof("Evaluating node %s: %v, args: %#v", nodeName, templateName, args)
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if ok && node.Completed() {
fmt.Printf("Node %s already completed\n", nodeName)
log.Infof("Node %s already completed", nodeName)
return false, nil
}
tmpl := wf.GetTemplate(templateName)
Expand All @@ -70,7 +71,7 @@ func (wfc *WorkflowController) executeTemplate(wf *wfv1.Workflow, templateName s
}
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: status}
wf.Status.Nodes[nodeID] = node
fmt.Printf("Initialized container node %v\n", node)
log.Infof("Initialized container node %v", node)
return true, nil
}
return false, nil
Expand All @@ -79,7 +80,7 @@ func (wfc *WorkflowController) executeTemplate(wf *wfv1.Workflow, templateName s
updates := false
if !ok {
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: wfv1.NodeStatusRunning}
fmt.Printf("Initialized workflow node %v\n", node)
log.Infof("Initialized workflow node %v", node)
wf.Status.Nodes[nodeID] = node
updates = true
}
Expand All @@ -94,11 +95,11 @@ func (wfc *WorkflowController) executeTemplate(wf *wfv1.Workflow, templateName s
updates = updates || sgUpdates
sgNodeID := wf.NodeID(sgNodeName)
if !wf.Status.Nodes[sgNodeID].Completed() {
fmt.Printf("Workflow step group node %v not yet completed\n", wf.Status.Nodes[sgNodeID])
log.Infof("Workflow step group node %v not yet completed", wf.Status.Nodes[sgNodeID])
return updates, nil
}
if !wf.Status.Nodes[sgNodeID].Successful() {
fmt.Printf("Workflow step group %v not successful\n", wf.Status.Nodes[sgNodeID])
log.Infof("Workflow step group %v not successful", wf.Status.Nodes[sgNodeID])
node.Status = wfv1.NodeStatusFailed
wf.Status.Nodes[nodeID] = node
return true, nil
Expand All @@ -118,14 +119,14 @@ func (wfc *WorkflowController) executeStepGroup(wf *wfv1.Workflow, stepGroup map
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if ok && node.Completed() {
fmt.Printf("Step group node %v already marked completed\n", node)
log.Infof("Step group node %v already marked completed", node)
return false, nil
}
updates := false
if !ok {
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: "Running"}
wf.Status.Nodes[nodeID] = node
fmt.Printf("Initializing step group node %v\n", node)
log.Infof("Initializing step group node %v", node)
updates = true
}
childNodeIDs := make([]string, 0)
Expand Down Expand Up @@ -153,12 +154,12 @@ func (wfc *WorkflowController) executeStepGroup(wf *wfv1.Workflow, stepGroup map
node.Status = wfv1.NodeStatusFailed
wf.Status.Nodes[nodeID] = node
updates = true
fmt.Printf("Step group node %s deemed failed due to failure of %s\n", nodeID, childNodeID)
log.Infof("Step group node %s deemed failed due to failure of %s", nodeID, childNodeID)
return updates, nil
}
}
node.Status = wfv1.NodeStatusSucceeded
wf.Status.Nodes[nodeID] = node
fmt.Printf("Step group node %s successful\n", nodeID)
log.Infof("Step group node %s successful", nodeID)
return true, nil
}
9 changes: 5 additions & 4 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -80,7 +81,7 @@ func envFromField(envVarName, fieldPath string) corev1.EnvVar {
}

func (wfc *WorkflowController) createWorkflowPod(wf *wfv1.Workflow, nodeName string, tmpl *wfv1.Template, args *wfv1.Arguments) error {
fmt.Printf("Creating Pod: %s\n", nodeName)
log.Infof("Creating Pod: %s", nodeName)
initCtr, err := wfc.newInitContainer(tmpl)
if err != nil {
return err
Expand Down Expand Up @@ -147,13 +148,13 @@ func (wfc *WorkflowController) createWorkflowPod(wf *wfv1.Workflow, nodeName str
// workflow pod names are deterministic. We can get here if
// the controller crashes after creating the pod, but fails
// to store the update to etc, and controller retries creation
fmt.Printf("pod %s already exists\n", nodeName)
log.Infof("pod %s already exists\n", nodeName)
return nil
}
fmt.Printf("Failed to create pod %s: %v\n", nodeName, err)
log.Infof("Failed to create pod %s: %v\n", nodeName, err)
return errors.InternalWrapError(err)
}
fmt.Printf("Created pod: %v\n", created)
log.Infof("Created pod: %v\n", created)
return nil
}

Expand Down

0 comments on commit 63a2c20

Please sign in to comment.