forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
map-reduce.yaml
162 lines (162 loc) · 5.32 KB
/
map-reduce.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# This workflow demonstrates a basic map-reduce.
# This requires you have a artifact repository configured.
#
# Notes:
# - You'll need to have an user namespaced artifact repository set-up to save intermediate results for this workflow.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: map-reduce-
spec:
entrypoint: main
arguments:
parameters:
- name: numParts
value: "4"
- name: numGroups
value: "2"
templates:
- name: main
dag:
tasks:
- name: split
template: split
arguments:
parameters:
- name: numParts
value: "{{workflow.parameters.numParts}}"
- name: map
template: map
arguments:
parameters:
- name: partId
value: '{{item}}'
- name: numGroups
value: '{{workflow.parameters.numGroups}}'
artifacts:
- name: parts
from: '{{tasks.split.outputs.artifacts.parts}}'
dependencies:
- split
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
arguments:
parameters:
- name: group
value: '{{item}}'
dependencies:
- map
withSequence:
count: "{{workflow.parameters.numGroups}}"
# The `split` task creates a number of "parts". Each part has a unique ID (e.g. part-0, part-1).
# This task writes the part IDs to stdout (so that the `map` task can be expanded to have one task per part).
# And, it writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved a output artifact.
- name: split
inputs:
parameters:
- name: numParts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
os.mkdir("/tmp/parts")
partIds = list(map(lambda x: "part-" + str(x), range({{inputs.parameters.numParts}})))
for i, partId in enumerate(partIds, start=1):
with open("/tmp/parts/" + partId + ".json", "w") as out:
json.dump({"foo": i}, out)
json.dump(partIds, sys.stdout)
outputs:
artifacts:
- name: parts
path: /tmp/parts
# One `map` per part ID is started. Finds its own "part file" under `/tmp/parts/${partId}`.
# Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory".
- name: map
inputs:
parameters:
- name: partId
- name: numGroups
artifacts:
- name: parts
path: /tmp/parts
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
partId = "{{inputs.parameters.partId}}"
numGroups = {{inputs.parameters.numGroups}}
os.mkdir("/tmp/results")
with open("/tmp/parts/" + partId + ".json") as f:
part = json.load(f)
with open("/tmp/results/" + partId + ".json", "w") as out:
json.dump({"bar": part["foo"] * 2, "group": part["foo"] % numGroups}, out)
outputs:
artifacts:
- name: result
path: /tmp/results/{{inputs.parameters.partId}}.json
archive:
none: { }
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json"
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
parameters:
- name: group
artifacts:
- name: result
path: /tmp/results
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results"
script:
image: python:alpine3.6
command:
- python
source: |
import json
import os
import sys
total = 0
group = "{{inputs.parameters.group}}"
os.mkdir("/tmp/totals/")
for f in list(map(lambda x: open("/tmp/results/" + x), os.listdir("/tmp/results"))):
result = json.load(f)
if result["group"] == group:
total = total + result["bar"]
with open("/tmp/totals/" + group, "w") as f:
f.write(str(total))
f.close()
outputs:
parameters:
- name: total-{{inputs.parameters.group}}
globalName: total-{{inputs.parameters.group}}
valueFrom:
path: /tmp/totals/{{inputs.parameters.group}}