Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error running 1000s of tasks: "etcdserver: request is too large" #1186 #1264

Merged
merged 16 commits into from
Mar 18, 2019
Prev Previous commit
Next Next commit
added Operator.go
  • Loading branch information
sarabala1979 committed Mar 14, 2019
commit b36564826d46ff224631930765f49770a2e4d155
99 changes: 97 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo/workflow/util/file"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import blocks should follow this convention:

  1. standard packages
  2. third party packages
  3. project packages


argokubeerr "github.com/argoproj/pkg/kube/errors"
"github.com/argoproj/pkg/strftime"
jsonpatch "github.com/evanphx/json-patch"
Expand Down Expand Up @@ -72,6 +74,9 @@ var (
// for before requeuing the workflow onto the workqueue.
const maxOperationTime time.Duration = 10 * time.Second

//maxWorkflowSize is the maximum size for workflow.yaml
const maxWorkflowSize int = 1024 * 1024

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
Expand Down Expand Up @@ -275,14 +280,20 @@ func (woc *wfOperationCtx) persistUpdates() {
return
}
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace)
woc.log.Info("Final size", woc.getSize())
if woc.wf.Status.CompressedNodes != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this line like this

        err := woc.checkAndCompress()
	if err != nil {
		woc.log.Warnf("Error compressing workflow: %v", err)
	}
	if woc.wf.Status.CompressedNodes != "" {
		woc.clearNodeStatusMap()
	}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks updated

woc.checkAndCompress()
woc.clearNodeStatusMap()
}

_, err := wfClient.Update(woc.wf)
if err != nil {
woc.log.Warnf("Error updating workflow: %v", err)
woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err))
if argokubeerr.IsRequestEntityTooLargeErr(err) {
woc.persistWorkflowSizeLimitErr(wfClient, err)
return
}
if !apierr.IsConflict(err) {
if err != nil && !apierr.IsConflict(err) {
jessesuen marked this conversation as resolved.
Show resolved Hide resolved
return
}
woc.log.Info("Re-appying updates on latest version and retrying update")
Expand Down Expand Up @@ -450,11 +461,32 @@ func (woc *wfOperationCtx) podReconciliation() error {
}

for _, pod := range podList.Items {
origNodeStatus := *woc.wf.Status.DeepCopy()
performAssessment(&pod)
err = woc.applyExecutionControl(&pod)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name)
}
err = woc.checkAndCompress()
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
node := woc.wf.Status.Nodes[nodeID]
node.Message = fmt.Sprintf("%v", err)
woc.log.Warn(node.Message)
node.Outputs = nil
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
node.Phase = wfv1.NodeError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a helper for all of this: woc.markNodePhase(). Can you use that instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

node.Completed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call to node.Completed() is not useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
err = woc.checkAndCompress()
if err != nil {
woc.markWorkflowError(err, true)
}
}

}

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
Expand Down Expand Up @@ -1576,3 +1608,66 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) {
}
return items, nil
}

// getSize return the entire workflow json string size
func (woc *wfOperationCtx) getSize() int {
nodeContent, err := json.Marshal(woc.wf)
if err != nil {
return -1
}

compressNodeSize := len(woc.wf.Status.CompressedNodes)

if compressNodeSize > 0 {
nodeStatus, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return -1
}
return len(nodeContent) - len(nodeStatus)
}
return len(nodeContent)
}

//checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize.
//The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this logic a little bit? As I understand getSize is needed because it assuments that workflow might have Nodes and CompressedNodes fields at the same time and it assuments that Nodes field will be removed before saving, right?

Instead please change logic to ensure these two fields never set at the same time: checkAndDecompress should set Nodes field and immediately remove CompressedNodes; checkAndCompress should immediately remove Nodes after compressing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function gets called in two places. one is the final workflow save in persistupdate(). Another place is in podReconciliation during the workflow execution to check the output of each node fit into the size. In this scenario Nodes and CompressedNodes both will co-exist.


nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
}

func (woc *wfOperationCtx) clearNodeStatusMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more efficient way to do it:

status := woc.wf.Status
status.Nodes = nil
woc.wf.Status = status

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, why isn't this just woc.wf.Status.Nodes = nil?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see this question answered in latest version. Am I missing something or can this be simplified further to woc.wf.Status.Nodes = nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

for k := range woc.wf.Status.Nodes {
delete(woc.wf.Status.Nodes, k)
}
}

//checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map.
func (woc *wfOperationCtx) checkAndDecompress() error {
if woc.wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
var tempNodes map[string]wfv1.NodeStatus

err = json.Unmarshal([]byte(nodeContent), &tempNodes)
if err != nil {
woc.log.Warn(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean return err here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}
woc.wf.Status.Nodes = tempNodes
}
return nil
}