Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error running 1000s of tasks: "etcdserver: request is too large" #1186 #1264

Merged
merged 16 commits into from
Mar 18, 2019
Merged
24 changes: 22 additions & 2 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"text/tabwriter"

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util/file"
"github.com/argoproj/pkg/humanize"
"github.com/ghodss/yaml"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

const onExitSuffix = "onExit"
Expand All @@ -36,6 +37,10 @@ func NewGetCommand() *cobra.Command {
if err != nil {
log.Fatal(err)
}
err = CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
printWorkflow(wf, output)
},
}
Expand All @@ -45,6 +50,21 @@ func NewGetCommand() *cobra.Command {
return command
}

func CheckAndDecompress(wf *wfv1.Workflow) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status.nodes field is used if user runs argo list -o=wide: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/list.go#L137

Also nodes field is accessed in argo logs: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/logs.go#L139

I would suggest to add method GetNodes() to WorkflowStatus structure which does same as CheckAndDecompress and returns nodes and use it everywhere in CLI instead of Status.Nodes

if wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
err = json.Unmarshal([]byte(nodeContent), &wf.Status.Nodes)
if err != nil {
log.Fatal(err)
}
wf.Status.CompressedNodes = ""
}
return nil
}

func printWorkflow(wf *wfv1.Workflow, outFmt string) {
switch outFmt {
case "name":
Expand Down
4 changes: 4 additions & 0 deletions cmd/argo/commands/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func countPendingRunningCompleted(wf *wfv1.Workflow) (int, int, int) {
pending := 0
running := 0
completed := 0
err := CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
for _, node := range wf.Status.Nodes {
tmpl := wf.GetTemplate(node.TemplateName)
if tmpl == nil || !tmpl.IsPodType() {
Expand Down
12 changes: 11 additions & 1 deletion cmd/argo/commands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

type logEntry struct {
Expand Down Expand Up @@ -136,6 +136,11 @@ func (p *logPrinter) PrintPodLogs(podName string) error {
// Prints logs for workflow pod steps and return most recent log timestamp per pod name
func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*time.Time {
var podNodes []v1alpha1.NodeStatus
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return nil
}
for _, node := range wf.Status.Nodes {
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError {
podNodes = append(podNodes, node)
Expand Down Expand Up @@ -193,6 +198,11 @@ func (p *logPrinter) printLiveWorkflowLogs(workflowName string, wfClient workflo
defer cancel()

processPods := func(wf *v1alpha1.Workflow) {
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return
}
for id := range wf.Status.Nodes {
node := wf.Status.Nodes[id]
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError && streamedPods[node.ID] == false {
Expand Down
2 changes: 2 additions & 0 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func watchWorkflow(name string) {
select {
case next := <-watchIf.ResultChan():
wf, _ = next.Object.(*wfv1.Workflow)
err := CheckAndDecompress(wf)
errors.CheckError(err)
case <-ticker.C:
}
if wf == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ type WorkflowStatus struct {
// A human readable message indicating details about why the workflow is in this condition.
Message string `json:"message,omitempty"`

// Compressed and base64 decoded Nodes map
CompressedNodes string `json:"compressedNodes,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces after all // comments


// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

Expand Down
97 changes: 97 additions & 0 deletions util/file/fileutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package file

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/base64"
"io"
"io/ioutil"
"os"
"strings"

log "github.com/sirupsen/logrus"
)

// IsFileOrDirExistInGZip return true if file or directory exists in GZip file
func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool {

fi, err := os.Open(gzipFilePath)

if os.IsNotExist(err) {
return false
}
defer close(fi)

fz, err := gzip.NewReader(fi)
if err != nil {
return false
}
tr := tar.NewReader(fz)
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {

return false
}
if hdr.FileInfo().IsDir() && strings.Contains(strings.Trim(hdr.Name, "/"), strings.Trim(sourcePath, "/")) {
return true
}
if strings.Contains(sourcePath, hdr.Name) && hdr.Size > 0 {
return true
}
}
return false
}

//Close the file
func close(f io.Closer) {
err := f.Close()
if err != nil {
log.Warnf("Failed to close the file/writer/reader. %v", err)
}
}

// CompressEncodeString will return the compressed string with base64 encoded
func CompressEncodeString(content string) string {
return base64.StdEncoding.EncodeToString(CompressContent([]byte(content)))
}

// DecodeDecompressString will return decode and decompress the
func DecodeDecompressString(content string) (string, error) {

buf, err := base64.StdEncoding.DecodeString(content)
if err != nil {
return "", err
}
dBuf, err := DecompressContent(buf)
if err != nil {
return "", err
}
return string(dBuf), nil
}

// CompressContent will compress the byte array using zip writer
func CompressContent(content []byte) []byte {
var buf bytes.Buffer
zipWriter := gzip.NewWriter(&buf)

_, err := zipWriter.Write(content)
if err != nil {
log.Warnf("Error in compressing: %v", err)
}
close(zipWriter)
return buf.Bytes()
}

// DecompressContent will return the uncompressed content
func DecompressContent(content []byte) ([]byte, error) {

buf := bytes.NewReader(content)
gZipReader, _ := gzip.NewReader(buf)
defer close(gZipReader)
return ioutil.ReadAll(gZipReader)
}
21 changes: 21 additions & 0 deletions util/file/fileutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package file

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful
func TestCompressContentString(t *testing.T) {
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." +
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" +
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" +
"\"2019-03-07T19:14:55Z\"}}"

compString := CompressEncodeString(content)

resultString, _ := DecodeDecompressString(compString)

assert.Equal(t, content, resultString)
}
10 changes: 10 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (wfc *WorkflowController) processNextItem() bool {
}

woc := newWorkflowOperationCtx(wf, wfc)
//Decompress the node if it is compressed

err = woc.checkAndDecompress()
if err != nil {
log.Warnf("Failed to decompress '%s' to workflow object: %v", key, err)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}
woc.operate()
if woc.wf.Status.Completed() {
wfc.throttler.Remove(key)
Expand Down
95 changes: 93 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/util/file"
"github.com/argoproj/argo/util/retry"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
Expand Down Expand Up @@ -72,6 +73,9 @@ var (
// for before requeuing the workflow onto the workqueue.
const maxOperationTime time.Duration = 10 * time.Second

//maxWorkflowSize is the maximum size for workflow.yaml
const maxWorkflowSize int = 1024 * 1024

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
Expand Down Expand Up @@ -275,9 +279,17 @@ func (woc *wfOperationCtx) persistUpdates() {
return
}
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace)
_, err := wfClient.Update(woc.wf)
err := woc.checkAndCompress()
if err != nil {
woc.log.Warnf("Error updating workflow: %v", err)
woc.log.Warnf("Error compressing workflow: %v", err)
}
if woc.wf.Status.CompressedNodes != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this line like this

        err := woc.checkAndCompress()
	if err != nil {
		woc.log.Warnf("Error compressing workflow: %v", err)
	}
	if woc.wf.Status.CompressedNodes != "" {
		woc.clearNodeStatusMap()
	}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks updated

woc.wf.Status.Nodes = nil
}

_, err = wfClient.Update(woc.wf)
if err != nil {
woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err))
if argokubeerr.IsRequestEntityTooLargeErr(err) {
woc.persistWorkflowSizeLimitErr(wfClient, err)
return
Expand Down Expand Up @@ -450,11 +462,24 @@ func (woc *wfOperationCtx) podReconciliation() error {
}

for _, pod := range podList.Items {
origNodeStatus := *woc.wf.Status.DeepCopy()
performAssessment(&pod)
err = woc.applyExecutionControl(&pod)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name)
}
err = woc.checkAndCompress()
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.markWorkflowError(err, true)
}
}

}

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
Expand Down Expand Up @@ -1138,6 +1163,14 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
return node
}

// markNodeErrorClearOuput is a convenience method to mark a node with an error and clear the output
func (woc *wfOperationCtx) markNodeErrorClearOuput(nodeName string, err error) *wfv1.NodeStatus {
nodeStatus := woc.markNodeError(nodeName, err)
nodeStatus.Outputs = nil
woc.wf.Status.Nodes[nodeStatus.ID] = *nodeStatus
return nodeStatus
}

// markNodeError is a convenience method to mark a node with an error and set the message from the error
func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeStatus {
return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error())
Expand Down Expand Up @@ -1576,3 +1609,61 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) {
}
return items, nil
}

// getSize return the entire workflow json string size
func (woc *wfOperationCtx) getSize() int {
nodeContent, err := json.Marshal(woc.wf)
if err != nil {
return -1
}

compressNodeSize := len(woc.wf.Status.CompressedNodes)

if compressNodeSize > 0 {
nodeStatus, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return -1
}
return len(nodeContent) - len(nodeStatus)
}
return len(nodeContent)
}

// checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize.
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this logic a little bit? As I understand getSize is needed because it assuments that workflow might have Nodes and CompressedNodes fields at the same time and it assuments that Nodes field will be removed before saving, right?

Instead please change logic to ensure these two fields never set at the same time: checkAndDecompress should set Nodes field and immediately remove CompressedNodes; checkAndCompress should immediately remove Nodes after compressing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function gets called in two places. one is the final workflow save in persistupdate(). Another place is in podReconciliation during the workflow execution to check the output of each node fit into the size. In this scenario Nodes and CompressedNodes both will co-exist.


nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
}

// checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map.
func (woc *wfOperationCtx) checkAndDecompress() error {
if woc.wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
var tempNodes map[string]wfv1.NodeStatus

err = json.Unmarshal([]byte(nodeContent), &tempNodes)
if err != nil {
woc.log.Warn(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean return err here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return err
}
woc.wf.Status.Nodes = tempNodes
}
return nil
}
2 changes: 1 addition & 1 deletion workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
"time"

"github.com/argoproj/argo/workflow/util/file"
"github.com/argoproj/argo/util/file"

"github.com/argoproj/argo/util"

Expand Down
Loading