diff --git a/cmd/argo/commands/get.go b/cmd/argo/commands/get.go index 24ab11366cf3..ad7c9932b19a 100644 --- a/cmd/argo/commands/get.go +++ b/cmd/argo/commands/get.go @@ -9,7 +9,9 @@ import ( "strings" "text/tabwriter" + "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/util/file" "github.com/argoproj/pkg/humanize" "github.com/ghodss/yaml" "github.com/spf13/cobra" @@ -49,6 +51,10 @@ func NewGetCommand() *cobra.Command { if showMetrics { metricsConfigMap = getMetricsConfigMap(wf, kubeClient) } + err = CheckAndDecompress(wf) + if err != nil { + log.Fatal(err) + } printWorkflow(wf, output) }, } @@ -60,6 +66,21 @@ func NewGetCommand() *cobra.Command { return command } +func CheckAndDecompress(wf *wfv1.Workflow) error { + if wf.Status.CompressedNodes != "" { + nodeContent, err := file.DecodeDecompressString(wf.Status.CompressedNodes) + if err != nil { + return errors.InternalWrapError(err) + } + err = json.Unmarshal([]byte(nodeContent), &wf.Status.Nodes) + if err != nil { + log.Fatal(err) + } + wf.Status.CompressedNodes = "" + } + return nil +} + func printWorkflow(wf *wfv1.Workflow, outFmt string) { switch outFmt { case "name": diff --git a/cmd/argo/commands/list.go b/cmd/argo/commands/list.go index 471a3f356f26..20647b59c89d 100644 --- a/cmd/argo/commands/list.go +++ b/cmd/argo/commands/list.go @@ -134,6 +134,10 @@ func countPendingRunningCompleted(wf *wfv1.Workflow) (int, int, int) { pending := 0 running := 0 completed := 0 + err := CheckAndDecompress(wf) + if err != nil { + log.Fatal(err) + } for _, node := range wf.Status.Nodes { tmpl := wf.GetTemplate(node.TemplateName) if tmpl == nil || !tmpl.IsPodType() { diff --git a/cmd/argo/commands/logs.go b/cmd/argo/commands/logs.go index 1a2ac6b1660e..ffb978e08cbc 100644 --- a/cmd/argo/commands/logs.go +++ b/cmd/argo/commands/logs.go @@ -18,6 +18,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -130,6 +131,11 @@ func (p *logPrinter) PrintPodLogs(podName string) error { // Prints logs for workflow pod steps and return most recent log timestamp per pod name func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*time.Time { var podNodes []v1alpha1.NodeStatus + err := CheckAndDecompress(wf) + if err != nil { + log.Warn(err) + return nil + } for _, node := range wf.Status.Nodes { if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError { podNodes = append(podNodes, node) @@ -208,6 +214,11 @@ func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPo streamedPods := make(map[string]bool) processPods := func(wf *v1alpha1.Workflow) { + err := CheckAndDecompress(wf) + if err != nil { + log.Warn(err) + return + } for id := range wf.Status.Nodes { node := wf.Status.Nodes[id] if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError && streamedPods[node.ID] == false { diff --git a/cmd/argo/commands/watch.go b/cmd/argo/commands/watch.go index 133ee033a5ee..abdcaaee5ae1 100644 --- a/cmd/argo/commands/watch.go +++ b/cmd/argo/commands/watch.go @@ -45,6 +45,8 @@ func watchWorkflow(name string) { select { case next := <-watchIf.ResultChan(): wf, _ = next.Object.(*wfv1.Workflow) + err := CheckAndDecompress(wf) + errors.CheckError(err) case <-ticker.C: } if wf == nil { diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 13e405758b6a..ae7d7173795e 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -450,6 +450,9 @@ type WorkflowStatus struct { // A human readable message indicating details about why the workflow is in this condition. Message string `json:"message,omitempty"` + // Compressed and base64 decoded Nodes map + CompressedNodes string `json:"compressedNodes,omitempty"` + // Nodes is a mapping between a node ID and the node's status. Nodes map[string]NodeStatus `json:"nodes,omitempty"` diff --git a/util/file/fileutil.go b/util/file/fileutil.go new file mode 100644 index 000000000000..f99b2e5dea34 --- /dev/null +++ b/util/file/fileutil.go @@ -0,0 +1,97 @@ +package file + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "encoding/base64" + "io" + "io/ioutil" + "os" + "strings" + + log "github.com/sirupsen/logrus" +) + +// IsFileOrDirExistInGZip return true if file or directory exists in GZip file +func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool { + + fi, err := os.Open(gzipFilePath) + + if os.IsNotExist(err) { + return false + } + defer close(fi) + + fz, err := gzip.NewReader(fi) + if err != nil { + return false + } + tr := tar.NewReader(fz) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + + return false + } + if hdr.FileInfo().IsDir() && strings.Contains(strings.Trim(hdr.Name, "/"), strings.Trim(sourcePath, "/")) { + return true + } + if strings.Contains(sourcePath, hdr.Name) && hdr.Size > 0 { + return true + } + } + return false +} + +//Close the file +func close(f io.Closer) { + err := f.Close() + if err != nil { + log.Warnf("Failed to close the file/writer/reader. %v", err) + } +} + +// CompressEncodeString will return the compressed string with base64 encoded +func CompressEncodeString(content string) string { + return base64.StdEncoding.EncodeToString(CompressContent([]byte(content))) +} + +// DecodeDecompressString will return decode and decompress the +func DecodeDecompressString(content string) (string, error) { + + buf, err := base64.StdEncoding.DecodeString(content) + if err != nil { + return "", err + } + dBuf, err := DecompressContent(buf) + if err != nil { + return "", err + } + return string(dBuf), nil +} + +// CompressContent will compress the byte array using zip writer +func CompressContent(content []byte) []byte { + var buf bytes.Buffer + zipWriter := gzip.NewWriter(&buf) + + _, err := zipWriter.Write(content) + if err != nil { + log.Warnf("Error in compressing: %v", err) + } + close(zipWriter) + return buf.Bytes() +} + +// DecompressContent will return the uncompressed content +func DecompressContent(content []byte) ([]byte, error) { + + buf := bytes.NewReader(content) + gZipReader, _ := gzip.NewReader(buf) + defer close(gZipReader) + return ioutil.ReadAll(gZipReader) +} diff --git a/util/file/fileutil_test.go b/util/file/fileutil_test.go new file mode 100644 index 000000000000..c6a6ecc7b8d2 --- /dev/null +++ b/util/file/fileutil_test.go @@ -0,0 +1,21 @@ +package file + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful +func TestCompressContentString(t *testing.T) { + content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." + + "run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" + + "\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" + + "\"2019-03-07T19:14:55Z\"}}" + + compString := CompressEncodeString(content) + + resultString, _ := DecodeDecompressString(compString) + + assert.Equal(t, content, resultString) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 6b3831829af3..1d26f211a44d 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -248,6 +248,16 @@ func (wfc *WorkflowController) processNextItem() bool { } woc := newWorkflowOperationCtx(wf, wfc) + //Decompress the node if it is compressed + + err = woc.checkAndDecompress() + if err != nil { + log.Warnf("Failed to decompress '%s' to workflow object: %v", key, err) + woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error())) + woc.persistUpdates() + wfc.throttler.Remove(key) + return true + } woc.operate() if woc.wf.Status.Completed() { wfc.throttler.Remove(key) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 34da222db159..433642112ff1 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -24,6 +24,7 @@ import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + "github.com/argoproj/argo/util/file" "github.com/argoproj/argo/util/retry" "github.com/argoproj/argo/workflow/common" "github.com/argoproj/argo/workflow/util" @@ -72,6 +73,9 @@ var ( // for before requeuing the workflow onto the workqueue. const maxOperationTime time.Duration = 10 * time.Second +//maxWorkflowSize is the maximum size for workflow.yaml +const maxWorkflowSize int = 1024 * 1024 + // newWorkflowOperationCtx creates and initializes a new wfOperationCtx object. func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx { // NEVER modify objects from the store. It's a read-only, local cache. @@ -270,9 +274,17 @@ func (woc *wfOperationCtx) persistUpdates() { return } wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace) - _, err := wfClient.Update(woc.wf) + err := woc.checkAndCompress() if err != nil { - woc.log.Warnf("Error updating workflow: %v", err) + woc.log.Warnf("Error compressing workflow: %v", err) + } + if woc.wf.Status.CompressedNodes != "" { + woc.wf.Status.Nodes = nil + } + + _, err = wfClient.Update(woc.wf) + if err != nil { + woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err)) if argokubeerr.IsRequestEntityTooLargeErr(err) { woc.persistWorkflowSizeLimitErr(wfClient, err) return @@ -444,11 +456,24 @@ func (woc *wfOperationCtx) podReconciliation() error { } for _, pod := range podList.Items { + origNodeStatus := *woc.wf.Status.DeepCopy() performAssessment(&pod) err = woc.applyExecutionControl(&pod) if err != nil { woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name) } + err = woc.checkAndCompress() + if err != nil { + woc.wf.Status = origNodeStatus + nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] + woc.log.Warnf("%v", err) + woc.markNodeErrorClearOuput(nodeNameForPod, err) + err = woc.checkAndCompress() + if err != nil { + woc.markWorkflowError(err, true) + } + } + } // Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in @@ -1117,6 +1142,14 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase, return node } +// markNodeErrorClearOuput is a convenience method to mark a node with an error and clear the output +func (woc *wfOperationCtx) markNodeErrorClearOuput(nodeName string, err error) *wfv1.NodeStatus { + nodeStatus := woc.markNodeError(nodeName, err) + nodeStatus.Outputs = nil + woc.wf.Status.Nodes[nodeStatus.ID] = *nodeStatus + return nodeStatus +} + // markNodeError is a convenience method to mark a node with an error and set the message from the error func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeStatus { return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error()) @@ -1539,3 +1572,61 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) { } return items, nil } + +// getSize return the entire workflow json string size +func (woc *wfOperationCtx) getSize() int { + nodeContent, err := json.Marshal(woc.wf) + if err != nil { + return -1 + } + + compressNodeSize := len(woc.wf.Status.CompressedNodes) + + if compressNodeSize > 0 { + nodeStatus, err := json.Marshal(woc.wf.Status.Nodes) + if err != nil { + return -1 + } + return len(nodeContent) - len(nodeStatus) + } + return len(nodeContent) +} + +// checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize. +// The compressed content will be assign to compressedNodes element and clear the nodestatus map. +func (woc *wfOperationCtx) checkAndCompress() error { + + if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) { + + nodeContent, err := json.Marshal(woc.wf.Status.Nodes) + if err != nil { + return errors.InternalWrapError(err) + } + buff := string(nodeContent) + woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) + + } + if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize { + return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) + } + return nil +} + +// checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map. +func (woc *wfOperationCtx) checkAndDecompress() error { + if woc.wf.Status.CompressedNodes != "" { + nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes) + if err != nil { + return errors.InternalWrapError(err) + } + var tempNodes map[string]wfv1.NodeStatus + + err = json.Unmarshal([]byte(nodeContent), &tempNodes) + if err != nil { + woc.log.Warn(err) + return err + } + woc.wf.Status.Nodes = tempNodes + } + return nil +} diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 64ed6c87f734..7701a9f4e765 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/argoproj/argo/util/file" + "github.com/argoproj/argo/util" "github.com/argoproj/argo/errors"