forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
134 lines (121 loc) · 3.91 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package util
import (
"context"
"fmt"
"os"
"strconv"
"strings"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/retry"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
)
type Closer interface {
Close() error
}
// Close is a convenience function to close a object that has a Close() method, ignoring any errors
// Used to satisfy errcheck lint
func Close(c Closer) {
_ = c.Close()
}
// GetSecrets retrieves a secret value and memoizes the result
func GetSecrets(ctx context.Context, clientSet kubernetes.Interface, namespace, name, key string) ([]byte, error) {
secretsIf := clientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
var err error
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
}
val, ok := secret.Data[key]
if !ok {
return []byte{}, errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key)
}
return val, nil
}
// Write the Terminate message in pod spec
func WriteTerminateMessage(message string) {
err := os.WriteFile("/dev/termination-log", []byte(message), 0o600)
if err != nil {
println("unable to write termination log: " + err.Error())
}
}
// Merge the two parameters Slice
// Merge the slices based on arguments order (first is high priority).
func MergeParameters(params ...[]wfv1.Parameter) []wfv1.Parameter {
var resultParams []wfv1.Parameter
passedParams := make(map[string]bool)
for _, param := range params {
for _, item := range param {
if _, ok := passedParams[item.Name]; ok {
continue
}
resultParams = append(resultParams, item)
passedParams[item.Name] = true
}
}
return resultParams
}
// MergeArtifacts merges artifact argument slices
// Merge the slices based on arguments order (first is high priority).
func MergeArtifacts(artifactSlices ...[]wfv1.Artifact) []wfv1.Artifact {
var result []wfv1.Artifact
alreadyMerged := make(map[string]bool)
for _, artifacts := range artifactSlices {
for _, item := range artifacts {
if !alreadyMerged[item.Name] {
result = append(result, item)
alreadyMerged[item.Name] = true
}
}
}
return result
}
func RecoverIndexFromNodeName(name string) int {
startIndex := strings.Index(name, "(")
endIndex := strings.Index(name, ":")
if startIndex < 0 || endIndex < 0 {
return -1
}
out, err := strconv.Atoi(name[startIndex+1 : endIndex])
if err != nil {
return -1
}
return out
}
func GenerateFieldSelectorFromWorkflowName(wfName string) string {
result := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String()
compare := RecoverWorkflowNameFromSelectorStringIfAny(result)
if wfName != compare {
panic(fmt.Sprintf("Could not recover field selector from workflow name. Expected '%s' but got '%s'\n", wfName, compare))
}
return result
}
func RecoverWorkflowNameFromSelectorStringIfAny(selector string) string {
const tag = "metadata.name="
if starts := strings.Index(selector, tag); starts > -1 {
suffix := selector[starts+len(tag):]
if ends := strings.Index(suffix, ","); ends > -1 {
return strings.TrimSpace(suffix[:ends])
}
return strings.TrimSpace(suffix)
}
return ""
}
// getDeletePropagation return the default or configured DeletePropagation policy
func GetDeletePropagation() *metav1.DeletionPropagation {
propagationPolicy := metav1.DeletePropagationBackground
envVal, ok := os.LookupEnv("WF_DEL_PROPAGATION_POLICY")
if ok && envVal != "" {
propagationPolicy = metav1.DeletionPropagation(envVal)
}
return &propagationPolicy
}