Skip to content

Commit

Permalink
fix: Consider offloaded and compressed node in retry and resume (argo…
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Apr 10, 2020
1 parent a25c6a2 commit e2d0aa2
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 29 deletions.
2 changes: 1 addition & 1 deletion persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

const OffloadNodeStatusDisabledWarning = "Workflow has offloaded nodes, but offloading has been disabled"
const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled"

type UUIDVersion struct {
UID string `db:"uid"`
Expand Down
2 changes: 1 addition & 1 deletion server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (a *ArtifactServer) getWorkflow(ctx context.Context, namespace string, work
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": namespace, "name": workflowName}).Warn(sqldb.OffloadNodeStatusDisabledWarning)
log.WithFields(log.Fields{"namespace": namespace, "name": workflowName}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
return wf, nil
Expand Down
10 changes: 5 additions & 5 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabledWarning)
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
err = packer.DecompressWorkflow(wf)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
if s.offloadNodeStatusRepo.IsEnabled() {
wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}]
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn("Workflow has offloaded nodes, but offloading has been disabled")
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabledWarning)
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
logCtx.Debug("Sending event")
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf)
wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name, req.NodeFieldSelector)
err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,12 @@ func (s *CLISuite) TestWorkflowLint() {
})
}

func (s *CLISuite) TestWorkflowRetry() {
func (s *CLISuite) TestWorkflowRetryNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}
s.Given().
Workflow("@testdata/exit-1.yaml").
When().
Expand Down
20 changes: 20 additions & 0 deletions test/e2e/cli_with_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ func (s *CLIWithServerSuite) TestArchive() {
})
}

func (s *CLIWithServerSuite) TestWorkflowRetryPersistence() {
if !s.Persistence.IsEnabled() {
// Persistence is disabled for this test, but it is enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_test.go
s.T().SkipNow()
}
s.Given().
Workflow("@testdata/exit-1.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(20*time.Second).
Given().
RunCli([]string{"retry", "exit-1"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
})
}

func TestCLIWithServerSuite(t *testing.T) {
suite.Run(t, new(CLIWithServerSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (woc *wfOperationCtx) persistUpdates() {
// try and compress nodes if needed
nodes := woc.wf.Status.Nodes

err := packer.CompressWorkflow(woc.wf)
err := packer.CompressWorkflowIfNeeded(woc.wf)
if packer.IsTooLargeError(err) || os.Getenv("ALWAYS_OFFLOAD_NODE_STATUS") == "true" {
if woc.controller.offloadNodeStatusRepo.IsEnabled() {
offloadVersion, err := woc.controller.offloadNodeStatusRepo.Save(string(woc.wf.UID), woc.wf.Namespace, nodes)
Expand Down
9 changes: 5 additions & 4 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
"github.com/argoproj/argo/workflow/common"
Expand Down Expand Up @@ -847,7 +848,7 @@ func TestSuspendResume(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow and operate again. two pods should be able to be scheduled
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "")
err = util.ResumeWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down Expand Up @@ -1166,7 +1167,7 @@ func TestSuspendTemplate(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow. verify resume workflow edits nodestatus correctly
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "")
err = util.ResumeWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down Expand Up @@ -1239,7 +1240,7 @@ func TestSuspendTemplateWithFilteredResume(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow, but with non-matching selector
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.paramaters.param1.value=value2")
err = util.ResumeWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "inputs.paramaters.param1.value=value2")
assert.Error(t, err)

// operate the workflow. nothing should have happened
Expand All @@ -1251,7 +1252,7 @@ func TestSuspendTemplateWithFilteredResume(t *testing.T) {
assert.True(t, util.IsWorkflowSuspended(wf))

// resume the workflow, but with matching selector
err = util.ResumeWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1")
err = util.ResumeWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions workflow/packer/packer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,26 @@ func IsTooLargeError(err error) bool {
return err != nil && strings.HasPrefix(err.Error(), tooLarge)
}

func CompressWorkflow(wf *wfv1.Workflow) error {
func CompressWorkflowIfNeeded(wf *wfv1.Workflow) error {
large, err := IsLargeWorkflow(wf)
if err != nil {
return err
}
if !large {
return nil
}
return compressWorkflow(wf)
}

func compressWorkflow(wf *wfv1.Workflow) error {
nodeContent, err := json.Marshal(wf.Status.Nodes)
if err != nil {
return err
}
wf.Status.CompressedNodes = file.CompressEncodeString(string(nodeContent))
wf.Status.Nodes = nil
// still too large?
large, err = IsLargeWorkflow(wf)
large, err := IsLargeWorkflow(wf)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions workflow/packer/packer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestDecompressWorkflow(t *testing.T) {
Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{}},
},
}
err := CompressWorkflow(wf)
err := CompressWorkflowIfNeeded(wf)
if assert.NoError(t, err) {
assert.NotNil(t, wf)
assert.NotEmpty(t, wf.Status.Nodes)
Expand All @@ -40,7 +40,7 @@ func TestDecompressWorkflow(t *testing.T) {
Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{}, "bar": wfv1.NodeStatus{}},
},
}
err := CompressWorkflow(wf)
err := CompressWorkflowIfNeeded(wf)
if assert.NoError(t, err) {
assert.NotNil(t, wf)
assert.Empty(t, wf.Status.Nodes)
Expand All @@ -62,7 +62,7 @@ func TestDecompressWorkflow(t *testing.T) {
Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{}, "bar": wfv1.NodeStatus{}, "baz": wfv1.NodeStatus{}, "qux": wfv1.NodeStatus{}},
},
}
err := CompressWorkflow(wf)
err := CompressWorkflowIfNeeded(wf)
if assert.Error(t, err) {
assert.True(t, IsTooLargeError(err))
// if too large, we want the original back please
Expand Down
94 changes: 83 additions & 11 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -341,7 +342,7 @@ func SuspendWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string) error

// ResumeWorkflow resumes a workflow by setting spec.suspend to nil and any suspended nodes to Successful.
// Retries conflict errors
func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFieldSelector string) error {
func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, workflowName string, nodeFieldSelector string) error {
if len(nodeFieldSelector) > 0 {
return updateWorkflowNodeByKey(wfIf, workflowName, nodeFieldSelector, wfv1.NodeSucceeded, "")
} else {
Expand All @@ -353,7 +354,7 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFi

err = packer.DecompressWorkflow(wf)
if err != nil {
log.Fatal(err)
return false, fmt.Errorf("unable to decompress workflow: %s", err)
}

workflowUpdated := false
Expand All @@ -362,17 +363,50 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFi
workflowUpdated = true
}

nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return false, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}
newNodes := nodes.DeepCopy()

// To resume a workflow with a suspended node we simply mark the node as Successful
for nodeID, node := range wf.Status.Nodes {
for nodeID, node := range nodes {
if node.IsActiveSuspendNode() {
node.Phase = wfv1.NodeSucceeded
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
wf.Status.Nodes[nodeID] = node
newNodes[nodeID] = node
workflowUpdated = true
}
}

if workflowUpdated {
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(wf.UID), wf.Namespace, newNodes)
if err != nil {
return false, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
wf.Status.OffloadNodeStatusVersion = offloadVersion
wf.Status.CompressedNodes = ""
wf.Status.Nodes = nil
} else {
wf.Status.Nodes = newNodes
}

err = packer.CompressWorkflowIfNeeded(wf)
if err != nil {
return false, fmt.Errorf("unable to compress workflow: %s", err)
}

_, err = wfIf.Update(wf)
if err != nil {
if apierr.IsConflict(err) {
Expand Down Expand Up @@ -578,12 +612,18 @@ func convertNodeID(newWf *wfv1.Workflow, regex *regexp.Regexp, oldNodeID string,
}

// RetryWorkflow updates a workflow, deleting all failed steps as well as the onExit node (and children)
func RetryWorkflow(kubeClient kubernetes.Interface, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow) (*wfv1.Workflow, error) {
func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatusRepo, wfClient v1alpha1.WorkflowInterface, wf *wfv1.Workflow) (*wfv1.Workflow, error) {
switch wf.Status.Phase {
case wfv1.NodeFailed, wfv1.NodeError:
default:
return nil, errors.Errorf(errors.CodeBadRequest, "workflow must be Failed/Error to retry")
}

err := packer.DecompressWorkflow(wf)
if err != nil {
return nil, fmt.Errorf("unable to decompress workflow: %s", err)
}

newWF := wf.DeepCopy()
podIf := kubeClient.CoreV1().Pods(wf.ObjectMeta.Namespace)

Expand All @@ -600,22 +640,34 @@ func RetryWorkflow(kubeClient kubernetes.Interface, wfClient v1alpha1.WorkflowIn
}

// Iterate the previous nodes. If it was successful Pod carry it forward
newWF.Status.Nodes = make(map[string]wfv1.NodeStatus)
newNodes := make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
for _, node := range wf.Status.Nodes {
nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}

for _, node := range nodes {
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped:
if !strings.HasPrefix(node.Name, onExitNodeName) {
newWF.Status.Nodes[node.ID] = node
newNodes[node.ID] = node
continue
}
case wfv1.NodeError, wfv1.NodeFailed:
if !strings.HasPrefix(node.Name, onExitNodeName) && node.Type == wfv1.NodeTypeDAG {
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) {
newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
newNode.FinishedAt = metav1.Time{}
newWF.Status.Nodes[newNode.ID] = *newNode
newNodes[newNode.ID] = *newNode
continue
}
// do not add this status to the node. pretend as if this node never existed.
Expand All @@ -634,16 +686,36 @@ func RetryWorkflow(kubeClient kubernetes.Interface, wfClient v1alpha1.WorkflowIn
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
newNode.FinishedAt = metav1.Time{}
newWF.Status.Nodes[newNode.ID] = *newNode
newNodes[newNode.ID] = *newNode
continue
}
}

if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(newWF.UID), newWF.Namespace, newNodes)
if err != nil {
return nil, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
newWF.Status.OffloadNodeStatusVersion = offloadVersion
newWF.Status.CompressedNodes = ""
newWF.Status.Nodes = nil
} else {
newWF.Status.Nodes = newNodes
}

newWF.Status.StoredTemplates = make(map[string]wfv1.Template)
for id, tmpl := range wf.Status.StoredTemplates {
newWF.Status.StoredTemplates[id] = tmpl
}

err = packer.CompressWorkflowIfNeeded(newWF)
if err != nil {
return nil, fmt.Errorf("unable to compress workflow: %s", err)
}

return wfClient.Update(newWF)
}

Expand Down
Loading

0 comments on commit e2d0aa2

Please sign in to comment.