Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using templates in yaml definition for variables ? #124

Open
kavink opened this issue Apr 8, 2018 · 24 comments
Open

Using templates in yaml definition for variables ? #124

kavink opened this issue Apr 8, 2018 · 24 comments
Labels

Comments

@kavink
Copy link

kavink commented Apr 8, 2018

Wondering if selinon has plans to integrate jinja2 so users can use templates for variables and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.

@fridex fridex added the question label Apr 8, 2018
@fridex
Copy link
Member

fridex commented Apr 8, 2018

Wondering if selinon has plans to integrate jinja2 so users can use templates for variables

I'm not sure here. Where these templates would be beneficial? Can you give me a usage example?

and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.

That is already possible - a flow is defined by arguments, so you can pass arguments to all tasks and in each task get only the relevant parts. If you need to chain results, you need to use databases/storages and transparently retrieve results of previous tasks. If you want to directly use a queue for this purpose, you can say that the first task in the flow should propagate arguments to subsequent tasks/flows. You can however do that only in the first task as flow is defined by name and arguments passed (also if not implemented this way, it could lead to race conditions).

However a recommended way is to use databases/storages. You can reuse results, be sure with atomicity if needed and you don't need to worry about message size limitations. An idea is to keep arguments as small as possible and do any manipulation on task results in selinon for real "big data processing pipelines" and use databases that are optimized for data querying/storing/retrieval. But this surely depends on your use case and data load.

@kavink
Copy link
Author

kavink commented Apr 8, 2018

basically similar to how Ansible yaml files allowing, multiple flow's with different params or chaining flows. But im new to Selinon and trying wrap my head how to integrate with my project.

Basically I am using celery/flask/Postgres/Redis and created a DSL to trigger tasks with params, So I basically read YAML figure out which class/function to call and params to pass. Now just just need to Traverse the graph (DAG) and call tasks in celery. Thats when I found this project and its very close to what I want to do. Just need to figure out how to use it.

Would love some help either with a very complex real work demo where I can trigger flow from API and also an API or client which subscribes and can see results or current progress, And maybe an API which takes dispatcher_id and returns json of all task/sub task status and properties/results for each.

@bubthegreat
Copy link

In regards to the "use templates for variables" - I would currently use this to define new flow entry points simply that point to another flow - so if my task accepts variables and then can send that to the flow, I can use a new flow yaml to define the variables that will be used by a more generalized flow. Example:

general_flow.yaml

flow-definitions:
  - name: case_worker
    queue: case_worker_queue_v0
    edges:
      - from:
        to: get_cases_task
      - from: get_cases_task
        to: log_waiter_task
        condition:
          name: argsFieldEqual
          args:
            key: needs_logs
            value: true
      - from: get_cases_task
        to: tool_runner_task
        condition:
          name: argsFieldEqual
          args:
            key: needs_logs
            value: false
      - from: log_waiter_task
        to: tool_runner_task
      - from: tool_runner_task
        to: jira_creator_task
      - from: jira_creator_task
        to: case_updater_task

alert_60_flow.yaml

flow-definitions:
  - name: alert_60
    queue: get_cases_v0
    edges:
      - from:
        to: work_cases_flow
            node_args:
                subject_query: '[60]'
                needs_logs: True
                last: '4h'
                tool: photon.tools.alerts.alert_60

This makes it so rather than having to do extra things in tasks, I can make one general workflow, and have multiple flows that can use one general workflow more simply - I don't need to define a separate task for each of them, I can just pass node_args directly from a dummy_task or some such for my more general workflow.

Now, to set up another configuration that has separate tools:

flow-definitions:
  - name: alert_40
    queue: get_cases_v0
    edges:
      - from:
        to: work_cases_flow
            node_args:
                subject_query: '[40]'
                needs_logs: False
                last: '4h'
                tool: photon.tools.alerts.alert_40

Both of those flows can take advantage of a general flow, but don't require separate tasks to be defined to utilize the main flow.

@fridex
Copy link
Member

fridex commented Oct 22, 2018

@bubthegreat sorry for late answer, I somehow missed this.

Looking at this - I'm not sure if I follow. I assume the cases_worker calls a subflow called get_cases_task (the very first node in the flow) and the subflow that is run (alert_60 or alert_40) is determined based on passed node_args. Am I right with the assumption?

@nerdvegas
Copy link

Hi Fridex,

I'm looking at selinon as a possible distributed task processing solution, and I'll echo kavink's question about chaining together task inputs and outputs.

What I need is the ability to implement reusable tasks, and in order to do that, I need a way to map task inputs and outputs within a flow definition. The input/output values would be fine to be restricted to JSON. I will show what I am looking for with an example.

Consider three tasks, shown here as native python functions (just for the sake of simplicity):

def render_image(filepath):
   """Render an image, and return (float) number of seconds it took to complete."""
   ...

def sum(values):
    """Returns sum of given values."""
    return sum(values)

def send_email(from, to, subject, body):
    "Send an email."""
    ...

Now consider the following (theoretical) workflow, with the features I am looking for (skipping task definitions for simplicity):

tasks:
  - name: render1
  - name: render2
  - name: sum1
  - name: email1

flow-definitions:
  - name: myflow
    edges:
      - from:
          task: render1
        to:
          task: sum1
          bind_args:
            values:
              mode: append
      - from:
          task: render2
        to:
          task: sum1
          bind_args:
            values:
              mode: append
      - from:
          task: sum1
        to:
          task: email1
          bind_args:
            from: "someguy@somewhere"
            to: "you@there"
            subject: "total image render time"
            body:
              mode: set_templated
              template: "The images took {{ input }} seconds to render."

Here I want to:

  • render two images
  • sum the time it took to render them
  • email that info to someone.

In this approach, an edge additionally defines how input/output data are mapped from one task to the other. It also shows how templating is useful - I want to send an email with a templated body that can use outputs from the previous task to construct the email.

Do you think it makes sense for selinon to provide this kind of functionality? Given that selinon is aiming to be something like "celery but better" if you will, then imo it does make sense for it to provide not only chaining of task inputs/outputs (as celery does), but more refined control of that process than celery does (hence the arg mapping controls shown in the flow definition above).

Thx,
Allan

ps - I'm not at all married to the syntax for arg control given above; there's probably a better and more succinct approach. I'm juts aiming to give the overall idea of what I need to be able to do.

@fridex
Copy link
Member

fridex commented May 9, 2019

Hi @nerdvegas,

thanks for the explanation. Now I see a real-world use case - it would definitely make sense to provide such functionality.

Actually, it would not be that hard to implement if this argument binding would go to task definition rather than to flow definition - I think it would also make sense from a semantics point of view - its a task related configuration, not flow related configuration. Task implementation can be still re-used as selinon provides task aliases (import the same Python code but under different task name). What do you think?

@nerdvegas
Copy link

I don't think the arg binding can go with the task definition, with the exception of constant inputs.

So what I mean by constant inputs is something I had missing in my example. Ie:

tasks:
  - name: render1
    args:
      path: /foo/img1.exr
  - name: render2
    args:
      path: /foo/img2.exr

I see that a workflow's node_args are passed to every node, but that's more like a workflow-global set of args right(?)

The problem with putting all arg binding in with the task definitions is that it won't read correctly. The fact that I want the output from render1 to append to the "values" arg of the 'sum1' task, only makes sense in the context of that edge.

For example, what if the sum task took two args ('a' and 'b'), and I wanted render1 to bind to its first arg, and render2 to its second? That might look like:

    edges:
      - from:
          task: render1
        to:
          task: sum1
          bind_args: a
      - from:
          task: render2
        to:
          task: sum1
          bind_args: b

I don't see how you'd represent this situation if including arg binding info with task definitions, because sum1 now has multiple bindings, that only make sense in conjunction with the associated input tasks.

@nerdvegas
Copy link

nerdvegas commented May 9, 2019

Just to add - my examples so far are by no means complete, I haven't really thought through what a yaml-based language for defining JSON data flow between tasks should look like. If you could nail that though, this would be so useful I think.

For example, maybe splitting the binding details something like this would be better (and maybe with this syntax, you can't specify one-to-many edges, you have to list each edge separately):

edges:
  - from: render1
    to: sum1
    bind:
      - from:   # null == entire output
        to: a
  - from: render2
    to: sum1
    bind:
      - from:
        to: b

With this syntax you'd be able to link together different input/output args in a fairly intuitive way (there would be "output args" if a task output a dict - you would treat each key as an arg that could be passed separately to a downstream task).

@fridex
Copy link
Member

fridex commented May 9, 2019

In that case, I'm probably missing the semantics behind binding - originally, I thought you are looking for a way how to define values specific for tasks directly in the YAML configuration file - constant inputs as you described them.

I see that a workflow's node_args are passed to every node, but that's more like a workflow-global set of args right(?)

Yes, that's true. The whole design of selinon is more "storage"-centric. These tasks in flows basically pass arguments that are mostly keys (with some parameters how tasks should behave) to storage on which they perform operations.

In the current configuration flow semantics, the scenario you used (without the bind_args configuration):

    edges:
      - from:
          task: render1
        to:
          task: sum1
          # bind_args: a
      - from:
          task: render2
        to:
          task: sum1
          # bind_args: b

would be read as - start flow in render1 and render2 tasks. After each task finishes, run 2 instances of sum1 task, each as a subsequent task for its parent (render1 -> sum1, render2 -> sum1).

If the purpose of sum1 task is to aggregate/sum results of tasks render1 and render2, the configuration would be:

    edges:
      - from:
        to:
          task: render1
          task: render2

      - from:
          task: render1
          task: render2
        to:
          task: sum1

This way selinon ensures tasks render1 and render2 both finish (possibly in parallel) and there is run sum1 task as a subsequent task. It will have records for render1 and render2 in its state (parent tasks) so you can easily inspect their results.

@fridex
Copy link
Member

fridex commented May 9, 2019

      - from:   # null == entire output

This looks like a usage of node_args_from_first configuration option - see https://selinon.readthedocs.io/en/latest/yaml.html#node-args-from-first

As described above, see selinon configuration as a transcript how you want to operate on data which are stored in storage.

@nerdvegas
Copy link

Ah right, I was interpreting tasks as a list of task instances, rather than definitions.

So if tasks are just task definitions then, isn't this ambiguous?:

    edges:
      - from:
        to:
          task: render1
          task: render2

      - from:
          task: render1
          task: render2
        to:
          task: sum1

Couldn't this be interpreted as two tasks with no dependencies (render1 and render2), followed by a separate DAG of 3 tasks, like so?:

(render1)

(render2)

(render1)--------->(sum1)
               |
(render2)------+

@fridex
Copy link
Member

fridex commented May 9, 2019

A flow is defined as a graph (not limited to acyclic graphs) of tasks which is interpreted based on task success/failure. You can see it as chaining execution of these tasks in a distributed environment to have results faster (selinon's scheduling system will ensure tasks which can be run in parallel will be run in parallel).

The example you are describing are two flows - maybe to give it shape now, let's create complete/valid selinon configuration:

flow-definitions:
  - name: flow1
    edges:
      - from:
        to:
          - render1
          - render2

  - name: flow2
    edges:
      - from:
          - render1
          - render2
        to:
          - sum1

These are two separate flows flow1 and flow2. In the first one, there are executed just tasks render1 and render2 (in parallel as there is no restriction on why not). The latter one executes also task sum1 (or a sub-flow consisting of tasks) after these tasks.

You can imagine adding more edges to create additional nodes in the flow graph, e.g.:

flow-definitions:
  - name: flow3
    edges:
      - from:
          - render1
          - render2
        to:
          - sum1
      - from:
          - sum1
          - render2
        to:
          - new_task

This is an extended example of flow2 - in this case the scheduling algorithm will guarantee new_task to be executed after sum1 and render2 tasks are successfully finished, records about them will be available in its state (see task methods to for example obtain results).

@nerdvegas
Copy link

Here's what I'm not getting. Consider the two flows:

flow-definitions:
  - name: flow3
    edges:
      - from:
          - render1
          - render2
        to:
          - sum1
      - from:
          - sum1
          - render2
        to:
          - new_task
flow-definitions:
  - name: flow4
    edges:
      - from:
          - render1
        to:
          - sum1
      - from:
          - render2
        to:
          - sum1

Are you saying that there will be one instance of sum1 run in flow3, but two in flow4?

@fridex
Copy link
Member

fridex commented May 9, 2019

Are you saying that there will be one instance of sum1 run in flow3, but two in flow4?

Yes. In flow4, the first sum1 task will be run after render1 is finished and a second one after sum2 is finished (these are separate edges).

@fridex
Copy link
Member

fridex commented May 9, 2019

BTW you can also do something like:

flow-definitions:
  - name: flow5
    edges:
      - from:
          - render1
        to:
          - sum1
          - sum1
          - sum1

Which will run 3 times task sum1, each after render1 is finished. Or use foreach and create N instances of tasks/sub-flows (N is computed on rutime) - https://selinon.readthedocs.io/en/latest/patterns.html#variable-number-of-child-tasks-or-sub-flows

@nerdvegas
Copy link

Right, but in flow3, the multiple sum1 references are regarded as the same task? That is the ambiguity I'm referring to. It doesn't seem clear to me that a task is regarded as the same instance simply because it is at the start and end of separate edges. It's confusing because in one way tasks are being regarded as definitions, but in other cases, as instances.

@nerdvegas
Copy link

So what would this do?:

flow-definitions:
  - name: flow5
    edges:
      - from:
          - render1
        to:
          - sum1
          - sum1
          - sum1
      - from:
          - sum1
        to:
          - othertask

@fridex
Copy link
Member

fridex commented May 9, 2019

Right, but in flow3, the multiple sum1 references are regarded as the same task? That is the ambiguity I'm referring to. It doesn't seem clear to me that a task is regarded as the same instance simply because it is at the start and end of separate edges. It's confusing because in one way tasks are being regarded as definitions, but in other cases, as instances.

Read the configuration file as transcript/recipe how to run tasks in a distributed system.

So what would this do?:

flow-definitions:
  - name: flow5
    edges:
      - from:
          - render1
        to:
          - sum1
          - sum1
          - sum1
      - from:
          - sum1
        to:
          - othertask

Reading it:

Flow flow5 starts in task render1 (assuming all nodes in flow are tasks, not sub-flows). Once task render1 finishes successfully, start following tasks sum1, sum1, sum1. Once task task1 finishes (there are 3 instances), run othertask, each for successfully finished task sum1 - so there will be three instances of othertask (if all sum1 finished successfully), each will have different parent task othertask.

You can see tasks as processes on unix systems - you have multiple bash processes, which differ in pid (an instance of bash). These task instances in selinon will also have a different task id (instance of render1, sum1, othertask, ...).

@fridex
Copy link
Member

fridex commented May 9, 2019

An example use case I can think of:

Task render1 pre-processes an image and stores it in a database/storage. Tasks sum1 perform some random operation on image colors (e.g. create filter for some color), where each task generates a random number based on which the color filter is applied. Task othertask uploads results to some external storage for each result of task (instance) sum1 run in flow.

@nerdvegas
Copy link

nerdvegas commented May 9, 2019 via email

@nerdvegas
Copy link

nerdvegas commented May 9, 2019 via email

@fridex
Copy link
Member

fridex commented May 9, 2019

What if I only want othertask to run after one of the sum1 tasks
specifically, how would I express that?

It looks like you want to create a condition - run othertask after sum1 if ...:

flow-definitions:
  - name: flow5
    edges:
      - from:
        to: render1
      - from:
          - render1
        to:
          - sum1
          - sum1
          - sum1
      - from:
          - sum1
        to:
          - othertask
        condition:
          name: 'fieldEqual'
          args:
            key:
              - 'bar'
            value: 1

Similarly, how would I describe that I want one othertask to run, after
all three sum1 tasks have run?

You want to wait until all tasks sum1 finish - let's run them in a separate flow and wait for that flow to finish:

flow-definitions:
  - name: flow6
    edges:
      - from:
        to: render1
      - from:
          - render1
        to:
          - subflow1
      - from:
          - subflow1
        to:
          - othertask

  - name: subflow1
    edges:
      - from:
        to:
          - sum1
          - sum1
          - sum1

@nerdvegas
Copy link

nerdvegas commented May 9, 2019 via email

@fridex
Copy link
Member

fridex commented May 9, 2019

I don't want either of those things though.

I don't want a conditional, I just want othertask to be downstream of one
specific instance of a sum1 task.

Which spicific task? To be deterministic, there needs to be a mechanism on how to specify which is the "specific" task.

BTW another approach is to make one task alias - it shares with other tasks sum1 but is present under a different name in the system:

flow-definitions:
  - name: flow5
    edges:
      - from:
        to: render1
      - from:
          - render1
        to:
          - sum1
          - sum1
          - sum1_aliased
      - from:
          - sum1_aliased
        to:
          - othertask

I don't want a separate flow in the second example, because othertask might
be downstream from some other 'foo' task, as well as the three sum1 tasks
(ie it has 4 input tasks). If I had to split this into two separate flows,
then 'foo' would be delayed until flow2 ran - in other words I'd have the
'foo' task forced into serial rather than parallel execution.

That's why there are provided building blocks which, if used appropriately, can serve different use-cases. If I understand the described use case correctly, you want to run sum1 tasks and foo task in parallel and then run othertask (after all 4 finish) - overhead of sub-flow is really minimal if selinon is configured correctly. You will benefit from parallel execution:

flow-definitions:
  - name: flow6
    edges:
      - from:
        to: render1
      - from:
          - render1
        to:
          - subflow1
      - from:
          - subflow1
          - foo
        to:
          - othertask

  - name: subflow1
    edges:
      - from:
        to:
          - sum1
          - sum1
          - sum1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants