-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using templates in yaml definition for variables ? #124
Comments
I'm not sure here. Where these templates would be beneficial? Can you give me a usage example?
That is already possible - a flow is defined by arguments, so you can pass arguments to all tasks and in each task get only the relevant parts. If you need to chain results, you need to use databases/storages and transparently retrieve results of previous tasks. If you want to directly use a queue for this purpose, you can say that the first task in the flow should propagate arguments to subsequent tasks/flows. You can however do that only in the first task as flow is defined by name and arguments passed (also if not implemented this way, it could lead to race conditions). However a recommended way is to use databases/storages. You can reuse results, be sure with atomicity if needed and you don't need to worry about message size limitations. An idea is to keep arguments as small as possible and do any manipulation on task results in selinon for real "big data processing pipelines" and use databases that are optimized for data querying/storing/retrieval. But this surely depends on your use case and data load. |
basically similar to how Ansible yaml files allowing, multiple flow's with different params or chaining flows. But im new to Selinon and trying wrap my head how to integrate with my project. Basically I am using celery/flask/Postgres/Redis and created a DSL to trigger tasks with params, So I basically read YAML figure out which class/function to call and params to pass. Now just just need to Traverse the graph (DAG) and call tasks in celery. Thats when I found this project and its very close to what I want to do. Just need to figure out how to use it. Would love some help either with a very complex real work demo where I can trigger flow from API and also an API or client which subscribes and can see results or current progress, And maybe an API which takes dispatcher_id and returns json of all task/sub task status and properties/results for each. |
In regards to the "use templates for variables" - I would currently use this to define new flow entry points simply that point to another flow - so if my task accepts variables and then can send that to the flow, I can use a new flow yaml to define the variables that will be used by a more generalized flow. Example: general_flow.yaml
alert_60_flow.yaml
This makes it so rather than having to do extra things in tasks, I can make one general workflow, and have multiple flows that can use one general workflow more simply - I don't need to define a separate task for each of them, I can just pass node_args directly from a dummy_task or some such for my more general workflow. Now, to set up another configuration that has separate tools:
Both of those flows can take advantage of a general flow, but don't require separate tasks to be defined to utilize the main flow. |
@bubthegreat sorry for late answer, I somehow missed this. Looking at this - I'm not sure if I follow. I assume the |
Hi Fridex, I'm looking at selinon as a possible distributed task processing solution, and I'll echo kavink's question about chaining together task inputs and outputs. What I need is the ability to implement reusable tasks, and in order to do that, I need a way to map task inputs and outputs within a flow definition. The input/output values would be fine to be restricted to JSON. I will show what I am looking for with an example. Consider three tasks, shown here as native python functions (just for the sake of simplicity):
Now consider the following (theoretical) workflow, with the features I am looking for (skipping task definitions for simplicity):
Here I want to:
In this approach, an edge additionally defines how input/output data are mapped from one task to the other. It also shows how templating is useful - I want to send an email with a templated body that can use outputs from the previous task to construct the email. Do you think it makes sense for selinon to provide this kind of functionality? Given that selinon is aiming to be something like "celery but better" if you will, then imo it does make sense for it to provide not only chaining of task inputs/outputs (as celery does), but more refined control of that process than celery does (hence the arg mapping controls shown in the flow definition above). Thx, ps - I'm not at all married to the syntax for arg control given above; there's probably a better and more succinct approach. I'm juts aiming to give the overall idea of what I need to be able to do. |
Hi @nerdvegas, thanks for the explanation. Now I see a real-world use case - it would definitely make sense to provide such functionality. Actually, it would not be that hard to implement if this argument binding would go to task definition rather than to flow definition - I think it would also make sense from a semantics point of view - its a task related configuration, not flow related configuration. Task implementation can be still re-used as selinon provides task aliases (import the same Python code but under different task name). What do you think? |
I don't think the arg binding can go with the task definition, with the exception of constant inputs. So what I mean by constant inputs is something I had missing in my example. Ie:
I see that a workflow's The problem with putting all arg binding in with the task definitions is that it won't read correctly. The fact that I want the output from render1 to append to the "values" arg of the 'sum1' task, only makes sense in the context of that edge. For example, what if the
I don't see how you'd represent this situation if including arg binding info with task definitions, because |
Just to add - my examples so far are by no means complete, I haven't really thought through what a yaml-based language for defining JSON data flow between tasks should look like. If you could nail that though, this would be so useful I think. For example, maybe splitting the binding details something like this would be better (and maybe with this syntax, you can't specify one-to-many edges, you have to list each edge separately):
With this syntax you'd be able to link together different input/output args in a fairly intuitive way (there would be "output args" if a task output a dict - you would treat each key as an arg that could be passed separately to a downstream task). |
In that case, I'm probably missing the semantics behind binding - originally, I thought you are looking for a way how to define values specific for tasks directly in the YAML configuration file - constant inputs as you described them.
Yes, that's true. The whole design of selinon is more "storage"-centric. These tasks in flows basically pass arguments that are mostly keys (with some parameters how tasks should behave) to storage on which they perform operations. In the current configuration flow semantics, the scenario you used (without the
would be read as - start flow in If the purpose of
This way selinon ensures tasks |
This looks like a usage of As described above, see selinon configuration as a transcript how you want to operate on data which are stored in storage. |
Ah right, I was interpreting So if
Couldn't this be interpreted as two tasks with no dependencies (render1 and render2), followed by a separate DAG of 3 tasks, like so?:
|
A flow is defined as a graph (not limited to acyclic graphs) of tasks which is interpreted based on task success/failure. You can see it as chaining execution of these tasks in a distributed environment to have results faster (selinon's scheduling system will ensure tasks which can be run in parallel will be run in parallel). The example you are describing are two flows - maybe to give it shape now, let's create complete/valid selinon configuration: flow-definitions:
- name: flow1
edges:
- from:
to:
- render1
- render2
- name: flow2
edges:
- from:
- render1
- render2
to:
- sum1 These are two separate flows You can imagine adding more edges to create additional nodes in the flow graph, e.g.: flow-definitions:
- name: flow3
edges:
- from:
- render1
- render2
to:
- sum1
- from:
- sum1
- render2
to:
- new_task This is an extended example of |
Here's what I'm not getting. Consider the two flows:
Are you saying that there will be one instance of |
Yes. In |
BTW you can also do something like: flow-definitions:
- name: flow5
edges:
- from:
- render1
to:
- sum1
- sum1
- sum1 Which will run 3 times task |
Right, but in |
So what would this do?:
|
Read the configuration file as transcript/recipe how to run tasks in a distributed system.
flow-definitions:
- name: flow5
edges:
- from:
- render1
to:
- sum1
- sum1
- sum1
- from:
- sum1
to:
- othertask Reading it: Flow You can see tasks as processes on unix systems - you have multiple |
An example use case I can think of: Task |
What if I only want `othertask` to run after one of the `sum1` tasks
specifically, how would I express that?
…On Thu, May 9, 2019 at 2:26 PM Fridolín Pokorný ***@***.***> wrote:
An example use case I can think of:
Task render1 pre-processes an image and stores it in a database/storage.
Tasks sum1 perform some random operation on image colors (e.g. create
filter for some color), where each task generates a random number based on
which the color filter is applied. Task othertask uploads results to some
external storage for each result of task (instance) sum1 run in flow.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#124 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAMOUSR2RF765HI6IWHWTRTPUORW3ANCNFSM4EZO4XLA>
.
|
Similarly, how would I describe that I want one `othertask` to run, after
all three `sum1` tasks have run?
…On Thu, May 9, 2019 at 2:31 PM Allan Johns ***@***.***> wrote:
What if I only want `othertask` to run after one of the `sum1` tasks
specifically, how would I express that?
On Thu, May 9, 2019 at 2:26 PM Fridolín Pokorný ***@***.***>
wrote:
> An example use case I can think of:
>
> Task render1 pre-processes an image and stores it in a database/storage.
> Tasks sum1 perform some random operation on image colors (e.g. create
> filter for some color), where each task generates a random number based on
> which the color filter is applied. Task othertask uploads results to
> some external storage for each result of task (instance) sum1 run in
> flow.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <#124 (comment)>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AAMOUSR2RF765HI6IWHWTRTPUORW3ANCNFSM4EZO4XLA>
> .
>
|
It looks like you want to create a condition - run flow-definitions:
- name: flow5
edges:
- from:
to: render1
- from:
- render1
to:
- sum1
- sum1
- sum1
- from:
- sum1
to:
- othertask
condition:
name: 'fieldEqual'
args:
key:
- 'bar'
value: 1
You want to wait until all tasks flow-definitions:
- name: flow6
edges:
- from:
to: render1
- from:
- render1
to:
- subflow1
- from:
- subflow1
to:
- othertask
- name: subflow1
edges:
- from:
to:
- sum1
- sum1
- sum1 |
I don't want either of those things though.
I don't want a conditional, I just want othertask to be downstream of one
specific instance of a sum1 task.
I don't want a separate flow in the second example, because othertask might
be downstream from some other 'foo' task, as well as the three sum1 tasks
(ie it has 4 input tasks). If I had to split this into two separate flows,
then 'foo' would be delayed until flow2 ran - in other words I'd have the
'foo' task forced into serial rather than parallel execution.
I'm confused because initially this project seemed like a way to define a
DAG and then execute it. To define a DAG, what I want is to specify the
nodes (instances of tasks), and then connect them with edges (order of edge
declaration does not matter).
A
…On Thu, May 9, 2019 at 2:39 PM Fridolín Pokorný ***@***.***> wrote:
What if I only want othertask to run after one of the sum1 tasks
specifically, how would I express that?
It looks like you want to create a condition - run othertask after sum1
if ...:
flow-definitions:
- name: flow5
edges:
- from:
to: render1
- from:
- render1
to:
- sum1
- sum1
- sum1
- from:
- sum1
to:
- othertask
condition:
name: 'fieldEqual'
args:
key:
- 'bar'
value: 1
Similarly, how would I describe that I want one othertask to run, after
all three sum1 tasks have run?
You want to wait until all tasks sum1 finish - let's run them in a
separate flow and wait for that flow to finish:
flow-definitions:
- name: flow6
edges:
- from:
to: render1
- from:
- render1
to:
- subflow1
- from:
- subflow1
to:
- othertask
- name: subflow1
edges:
- from:
to:
- sum1
- sum1
- sum1
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#124 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAMOUSSDQWMZWR2Q4CLDH3TPUOTIVANCNFSM4EZO4XLA>
.
|
Which spicific task? To be deterministic, there needs to be a mechanism on how to specify which is the "specific" task. BTW another approach is to make one task alias - it shares with other tasks flow-definitions:
- name: flow5
edges:
- from:
to: render1
- from:
- render1
to:
- sum1
- sum1
- sum1_aliased
- from:
- sum1_aliased
to:
- othertask
That's why there are provided building blocks which, if used appropriately, can serve different use-cases. If I understand the described use case correctly, you want to run flow-definitions:
- name: flow6
edges:
- from:
to: render1
- from:
- render1
to:
- subflow1
- from:
- subflow1
- foo
to:
- othertask
- name: subflow1
edges:
- from:
to:
- sum1
- sum1
- sum1 |
Wondering if selinon has plans to integrate jinja2 so users can use templates for variables and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.
The text was updated successfully, but these errors were encountered: