Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow step restart on workflow retry. Closes #2334 #2431

Merged
merged 4 commits into from
Apr 12, 2020

Conversation

markterm
Copy link
Contributor

This allows retryStrategy to contain restartOnWorkflowRetry, in which case the entire node (and therefore all descendent nodes) will be restarted if they haven't all already succeeded and the workflow is retried.

See: #2334 for more info

Checklist:

  • Either (a) I've created an enhancement proposal and discussed it with the community, (b) this is a bug fix, or (c) this is a chore.
  • The title of the PR is (a) conventional, (b) states what changed, and (c) suffixes the related issues number. E.g. "fix(controller): Updates such and such. Fixes #1234".
  • I have written unit and/or e2e tests for my change. PRs without these are unlike to be merged.
  • Optional. I've added My organization is added to the USERS.md.
  • I've signed the CLA and required builds are green.

workflow/controller/operator.go Outdated Show resolved Hide resolved
test/e2e/cli_test.go Outdated Show resolved Hide resolved
workflow/util/util.go Outdated Show resolved Hide resolved
@codecov
Copy link

codecov bot commented Mar 19, 2020

Codecov Report

Merging #2431 into master will increase coverage by 0.39%.
The diff coverage is 52.21%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2431      +/-   ##
==========================================
+ Coverage   11.22%   11.62%   +0.39%     
==========================================
  Files          83       84       +1     
  Lines       32696    32871     +175     
==========================================
+ Hits         3671     3820     +149     
  Misses      28525    28525              
- Partials      500      526      +26     
Impacted Files Coverage Δ
cmd/argo/commands/retry.go 0.00% <0.00%> (ø)
persist/sqldb/offload_node_status_repo.go 3.90% <ø> (ø)
pkg/apis/workflow/v1alpha1/workflow_types.go 10.78% <0.00%> (ø)
server/workflow/workflow_server.go 35.84% <20.00%> (ø)
workflow/util/util.go 27.38% <44.21%> (+12.66%) ⬆️
workflow/controller/steps.go 63.38% <45.45%> (ø)
workflow/controller/operator.go 60.45% <50.00%> (-0.59%) ⬇️
workflow/controller/dag.go 55.48% <57.14%> (+0.36%) ⬆️
util/argo/audit_logger.go 84.61% <94.44%> (ø)
workflow/packer/packer.go 78.72% <100.00%> (+0.94%) ⬆️
... and 5 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cf277eb...0b194c4. Read the comment docs.

@alexec
Copy link
Contributor

alexec commented Mar 19, 2020

@sarabala1979 I think this is your area of expertise?

@markterm
Copy link
Contributor Author

@sarabala1979 hi, any feedback on this?

@simster7 simster7 self-assigned this Mar 30, 2020
Copy link
Member

@simster7 simster7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial reaction is that this shouldn't be implemented in the workflow-controller. "Retry" is not a feature that the controller knows about. When a Workflow is retried, the CLI/Server manually edits the Workflow object and sets "Failed" steps to "Pending" so that they are re-run. The controller is unaware that this has happened and treats the Workflow as if it was running for the first time.

For this feature to be implemented without breaking abstraction barriers, it should be implemented fully on the RetryWorkflow function on workflow/util/util.go. This could perhaps be by specifying which steps should be fully restated on a UI/CLI, which are then passed to the function. The function can then restart the appropriate nodes from that input.

@@ -629,6 +629,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
var message string
if woc.workflowDeadline.IsZero() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSYK: The way we detect termination has changed since you opened this PR. This info is now found in Workflow.Spec.Shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I'm updating this.


- name: steps-inner
retryStrategy:
restartOnWorkflowRetry: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial reaction is that this is not the correct place for this.

retryStrategy deals with retrying this node during a single workflow execution. Your proposed flag deals with retrying the node across different executions. Under this implementation retryStrategy is overloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right - I'm moving restartOnWorkflowRetry up to the template level. Does that work?

for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() {
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline))
var message = ""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ineffectual assignment to message (from ineffassign)

Copy link
Contributor Author

@markterm markterm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look.

My use case for this is where we have an 'asynchronous' group of steps where the first starts a pod which kicks off a job outside Argo, then the suspend step is either resumed or failed depending on that job. Therefore just retrying the resume step on failure wouldn't be very useful.

As you saw I did implement all the actual logic in the RetryWorkflow function, but a user triggering a retry wouldn't know what to pass in to be fully restarted - I do think this information best belongs with the workflow, which means storing at least that in the workflow template. But I don't think that's very invasive ...

@@ -629,6 +629,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
var message string
if woc.workflowDeadline.IsZero() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I'm updating this.


- name: steps-inner
retryStrategy:
restartOnWorkflowRetry: true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right - I'm moving restartOnWorkflowRetry up to the template level. Does that work?

@simster7
Copy link
Member

but a user triggering a retry wouldn't know what to pass in to be fully restarted

If a user would tag a certain template as restartOnWorkflowRetry, wouldn't they be able to specify that same template when using argo retry? After all, we're not implementing automatic Workflow retries in this PR (#1578), so some user intervention/third-party scripting is still required to restart the Workflow. Why can't the user/script be able to supply which steps to be retried with argo retry?

I do think this information best belongs with the workflow, which means storing at least that in the workflow template. But I don't think that's very invasive.

Let me gather some more opinions with the team and get back to you. Could be that you're right and I'm a bit too stringent 🙂

@simster7
Copy link
Member

Let me gather some more opinions with the team and get back to you.

Hi @mark9white. The team agreed that we won't want to support this sort of labeling on the Workflow spec. This feature is still very much desired, but we'll have to find a way to specify which nodes to restart fully on the client-side.

@markterm
Copy link
Contributor Author

markterm commented Apr 1, 2020

Thanks for following up with the team. I could make this work by providing the ability to specify nodes to restart by templateName - would that be ok?

Doing this is not ideal for Argo users - has the team had any thoughts to providing first-class support for triggering asynchronous jobs from Argo without using polling? An example would be triggered a Spark job (eg directly or using something like Amazon EMR) where you would have one step trigger it and then a suspend step that waits for the job completion.

@markterm
Copy link
Contributor Author

markterm commented Apr 1, 2020

I have just modified the PR so the retry command takes in a --reset-nodes-field-selector parameter.

@simster7
Copy link
Member

simster7 commented Apr 6, 2020

I have just modified the PR so the retry command takes in a --reset-nodes-field-selector parameter.

Thanks @mark9white! I'll take another look.

Doing this is not ideal for Argo users

Do you mean using the --reset-nodes-field-selector approach? Would you mind explain a bit why?

Copy link
Member

@simster7 simster7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good, just some minor comments.

@@ -37,5 +51,6 @@ func NewRetryCommand() *cobra.Command {
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().StringVar(&retryOps.nodesToResetFieldSelector, "reset-nodes-field-selector", "", "selector of nodes to reset, eg: --node-field-selector inputs.paramaters.myparam.value=abc")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of this flag should be --node-field-selector as all this does is provide a selector. This way it would be analogous to #1904. To specify that we want said nodes restarted we can pass a flag in conjunction:

$ argo retry --restart-successful --node-field-selector inputs.paramaters.myparam.value=abc

Or something like this. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, am applying.

Comment on lines +638 to +648
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.log.Infoln(message)
return woc.markNodePhase(node.Name, lastChildNode.Phase, message), true, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this here? Isn't this covered by failSuspendedNodesAfterDeadlineOrShutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, in the case of a retry parent node it just keeps retrying pods that continually fail because they are being executed after the deadline. The integration test didn't work without it.

Comment on lines +820 to +823
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadlineOrShutdown() error {
if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

}

if selector.Matches(nodeFields) {
if selectorMatchesNode(selector, node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@markterm
Copy link
Contributor Author

markterm commented Apr 7, 2020

I have just modified the PR so the retry command takes in a --reset-nodes-field-selector parameter.

Thanks @mark9white! I'll take another look.

Doing this is not ideal for Argo users

Do you mean using the --reset-nodes-field-selector approach? Would you mind explain a bit why?

Because the person running 'retry' needs to know what selector to pass in to effectively retry the given workflow.

@markterm
Copy link
Contributor Author

markterm commented Apr 7, 2020

@simster7 I've applied the feedback.

Copy link
Member

@simster7 simster7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for this great PR @mark9white

// Delete/reset fields which indicate workflow completed
delete(newWF.Labels, common.LabelKeyCompleted)
newWF.Status.Conditions.UpsertCondition(wfv1.WorkflowCondition{Status: metav1.ConditionFalse, Type: wfv1.WorkflowConditionCompleted})
newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning)
newWF.Status.Phase = wfv1.NodeRunning
newWF.Status.Message = ""
newWF.Status.FinishedAt = metav1.Time{}
newWF.Spec.Shutdown = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@simster7
Copy link
Member

simster7 commented Apr 9, 2020

Hey @mark9white. Could you resolve the conflicts here please?

@markterm
Copy link
Contributor Author

markterm commented Apr 9, 2020 via email

@markterm
Copy link
Contributor Author

markterm commented Apr 9, 2020

Actually there is an issue in the e2e tests, which I'm looking into.

@@ -357,8 +364,10 @@ func (s *E2ESuite) printWorkflowDiagnostics(name string) {
s.CheckError(err)
wf.Status.Nodes = offloaded
}
logCtx.Debug("Workflow metadata:")
logCtx.Debug("Workflow metadata at %s:", time.Now().String())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

printf: Debug call has possible formatting directive %s (from govet)

@@ -715,7 +753,7 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
if err != nil {
return nil, fmt.Errorf("unable to compress workflow: %s", err)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed with -local github.com/argoproj/argo (from goimports)

Suggested change

Copy link
Member

@simster7 simster7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to renege the approval, but because of #2645 new changes are needed.

if err != nil {
return nil, err
} else {
for _, node := range wf.Status.Nodes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mark9white, because of #2645 this actually needs to be moved further down the code. Workflows with offloaded nodes are only retrieved starting in line 678, so if a Workflow has offloaded nodes, wf.Status.Nodes will be nil at this point in the code and the node field selector will have no effect.

While you're at this, would you mind extracting this block out to a helper function? RetryWorkflow is already a bit cluttered 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return nil, err
} else {
for _, node := range wf.Status.Nodes {
if selectorMatchesNode(selector, node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this code could be included in the large for loop starting at line 690. What do you think? Adding it there could save us an iteration through all the nodes. If you do decide to add it, please make sure it's added via a helper function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to determine the list of nodes including child nodes first.

newNodes[node.ID] = node
continue
}
case wfv1.NodeError, wfv1.NodeFailed:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) {
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) && !doForceResetNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this && ! doForceResetNode necessary here? What's the difference between "pretend as if this node never existed" or resetting it manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, fixed

@@ -655,14 +688,19 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
}

for _, node := range nodes {
var doForceResetNode = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
var doForceResetNode = false
forceResetNode := false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@markterm
Copy link
Contributor Author

I've retriggered the build as it failed on TestLogProblems (which is unrelated and looks to be flakey).

@sonarcloud
Copy link

sonarcloud bot commented Apr 12, 2020

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities (and Security Hotspot 0 Security Hotspots to review)
Code Smell A 4 Code Smells

No Coverage information No Coverage information
4.3% 4.3% Duplication

@markterm
Copy link
Contributor Author

I'm still seeing TestLogProblems failing, but I don't think related to this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants