Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customize UserDefinedDagsterK8sConfig per op via tags #23053

Merged

Conversation

alekseik1
Copy link
Contributor

Summary & Motivation

As stated in #22138 , Dagster currently requires that op/job limits in k8s are defined statically - and no dynamic overload is allowed.
I'd like to resolve this issue for celery-k8s launcher (we use them in our production).
This PR tries to address the problem by introducing a special tag "dagster-k8s/config-per-op" that allows to override user_defined_k8s_config.
I expect this to work as follows:

from dagster import op, job, schedule, RunRequest


@op
def op1():
    print(1 + 2)


# op defines it's own limits
@op(
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "1", "memory": "1Gi"},
                    "limits": {"cpu": "10", "memory": "10Gi"},
                }
            }
        }
    }
)
def op2():
    print(7)


# <------- one way to override op tags
@job(
    tags={
        "dagster-k8s/config-per-op": {
            "op2": {
                "container_config": {
                    # requests will be overwritten, limits will be taken from op definition
                    "resources": {
                        "requests": {"cpu": "2", "memory": "2Gi"},
                    }
                }
            },
            "op1": {
                "container_config": {
                    # op1 gets requests/limits even though it never defined them
                    "resources": {
                        "requests": {"cpu": "11", "memory": "11Gi"},
                        "limits": {"cpu": "22", "memory": "22Gi"},
                    }
                }
            },
        }
    }
)
def main_job():
    op1()
    op2()


# <------- another way to override op tags
@schedule(job=main_job, cron_schedule="* * * * *")
def main_job():
    return RunRequest(
        tags={
            "dagster-k8s/config-per-op": {
                "op2": {
                    "container_config": {
                        # requests will be overwritten at schedule level, limits will be taken from op definition
                        "resources": {
                            "requests": {"cpu": "8", "memory": "8Gi"},
                        }
                    }
                },
                "op1": {
                    "container_config": {
                        # op1 gets requests/limits from schedule definition
                        "resources": {
                            "requests": {"cpu": "88", "memory": "88Gi"},
                            "limits": {"cpu": "89", "memory": "89Gi"},
                        }
                    }
                },
            }
        }
    )

I could use this feature for jobs that vary in resource consumption (for instance, jobs that accept sql query and download data from db).

Could you please take a look at the code and check it for broken corner cases or violated contracts?
This is my first dive into Dagster codebase so I'll appreciate any comments and suggestions.

How I Tested These Changes

I modified some unit-tests in k8s-related modules in dagster. No integration testing was done.

@gibsondan gibsondan self-requested a review July 17, 2024 19:31
Copy link
Member

@gibsondan gibsondan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alekseik1 - thank you for sending this out!

The use case here makes sense to me, but I think this should go on config for the k8s_job_executor rather than using tags for this. I think the implementation hereshould be fairly similar, just pulled from a different place - maybe a "per_step_k8s_config" key to go alongside "step_k8s_config" here? https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-k8s/dagster_k8s/executor.py#L72-L76

Does that seem workable?

@alekseik1
Copy link
Contributor Author

Thanks for reply!
After looking into source code of dagster_celery_k8s/executor.py, I think that customizations done to k8s_job_executor won't go into celery k8s executor. So, the same logic for these 'per-op limits' is to be written twice: in dagster-k8s/ and in dagster-celery-k8s, which is bad.
Besides, keeping all customizations in one place will result in a huge executor's configuration. Imagine this in a setup with 10+ schedules:

Example of definitions.py
from dagster_celery_k8s import celery_k8s_job_executor
from dagster import Definitions
from .schedules import ALL as ALL_SCHEDULES
from .jobs import ALL as ALL_JOBS
from .sensors import ALL as ALL_SENSORS

defs = Definitions(
    jobs=ALL_JOBS,
    schedules=ALL_SCHEDULES,
    sensors=ALL_SENSORS,
    resources=...,
    executor=celery_k8s_job_executor(per_step_k8s_config={
        "some_schedule: {
            "op1_in_this_schedule": {
                 "container_config": ....
            },
            "op2_in_this_schedule": {
                 "container_config": ....
            },
            # and so on...
        },
        "another_schedule": {
            "and_many_ops_in_it": ....,
             ...
        }
    },
)

I believe a better place to declare schedule ops' requirements is in schedule itself - via tags, via args in @schedule, or in some other way. Same logic is already implemented in @job and @op - you can customize it's k8s config via tag dagster-k8s/config.

What would you say about it?

@gibsondan
Copy link
Member

My recommendation for sharing code between the two executors would be to pull as much logic as possible into helper functions that both executors call.

To your point about setting configuration in scbedules, I totally agree that this should be configurable at run launch time in the schedule - schedules can specify run configuration as well as tags for the runs that they launch. and run configuration can include executor configuration (in the same way that you can set executor configuration in the launchpad when launching a run. So I think both tags and executor configuration are able to handle that requirement.

@alekseik1
Copy link
Contributor Author

So, is it okay to leave per op resources configuration in run tags only for now? If so, could you check whether I missed something in code?

I'll be debugging this on our staging environment with celery k8s executors. Sorry, but I can't write proper integration tests for this feature right now, I'm too new to the codebase 👶

@gibsondan
Copy link
Member

Given that you can still configure the ops at runtime in the schedule if you use executor config (I can include an example of how that would work if that would be helpful) my recommendation is still to use executor config rather than tags for this. You can certainly fork the executor to use tags for your own use, but I think that would be a requirement for this to go into the master branch that everybody uses.

@alekseik1
Copy link
Contributor Author

I did not think it is possible to specify limits for ops in schedule. Could you please share an example of this?

@gibsondan
Copy link
Member

It's not possible today, but once there was a config field on the executor, it would be. Here's an example of what it might look like:

@schedule(job_name="foo_job", cron_schedule="* * * * *")
def my_schedule():
    return RunRequest(
        run_key=None,
        run_config={
            "execution": {
                "config": {
                    "celery-k8s": {
                        "per_step_k8s_config": {"op1": {"container_config": {"resources": {...}}}}
                    }
                }
            }
        },
    )

@alekseik1
Copy link
Contributor Author

Wow, what a twisted way to configure executor 😅
Just to make sure I understand you right, you propose to:

  1. Add an option for executor to set per op configuration. This configuration will take precedence over the op's "dagster-k8s/config" tag.
  2. Add option to override executor's configuration in schedule (it is {"execution": {"config": {"celery-k8s": ... }}} in your example)

Right?

@gibsondan
Copy link
Member

gibsondan commented Jul 18, 2024 via email

@alekseik1 alekseik1 force-pushed the precise-k8s-customization-for-ops branch from b2828b6 to 554cb61 Compare July 18, 2024 17:35
@alekseik1
Copy link
Contributor Author

@gibsondan, I've changed implementation so that celery-k8s executor can accept per-op settings.
It turns out that a schedule can override execution config "out-of-the-box", no additional code is needed.

I also included examples of overrides with comments - if you see fit, we may let them go to master (perhaps in other directory).

These changes were tested in my k8s deployment of Dagster: a docker image was built (see Dockerfile), then a helm chart was deployed with this image via argocd. After that I manually ran job/schedule and checked op's container limits/requests in pod's configuration.

Could you please take a look?

@alekseik1
Copy link
Contributor Author

@gibsondan ping for review

Copy link
Member

@gibsondan gibsondan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this plan makes sense to me. I left some suggested changes but the overall plan looks sound to me.

I think we are also going to want some kind of automated test for this, I appreciate that the celery k8s integration test suite is pretty cumbersome though.

What I think we want is some kind of test like this for the k8s job executor https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-k8s/dagster_k8s_tests/unit_tests/test_executor.py#L632-L635

but i appreciate that that's a bit of a lift to write from scratch at the moment. Once everything but the test is in we can see how we want to proceed here exactly.

@alekseik1
Copy link
Contributor Author

@gibsondan I made all fixes you proposed. Could you please take a look and hit resolve button if everything is ok?
Speaking of the tests - I could write some units tests, something like:

  1. Given a job with configured executor, when this job launches all it's ops, then these ops get new k8s resources config.
  2. Given Definitions with configured executor, when one job from Definitions is launched, then all the op's from this job get k8s resources config from Definitions configured executor.
  3. Same checks for scheduler and RunRequest. Also, I would add tests for precedence: executor.configured > job def > RunRequest > op tags

But I would need some help, because I can't make out how to create test executor/launcher properly and how to run it's jobs and ops. Maybe you could point me to some existing "good for copy-paste" test funcs so that I could write my own in a similar fashion?

@gibsondan
Copy link
Member

It looks like "make pyright" is unhappy with some of the typing changes here.

I think a unit test on _execute_step_k8s_job that includes an example of setting both types of configuration and verifies that they are merged together as expected would probably be sufficient.

ultimately I think we are looking for a test kind of like this one: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s_tests/test_launcher.py#L460

that test creates a run and puts it through the run launcher and verifies that a mocked k8s api has the expected properties - here we would be doing something similar but with a test run and that _execute_step_k8s_job method - verifying that the mock k8s api was called with the expected configuration

(If you need to pull the body of _execute_step_k8s_job into a separate method because its more difficulty to test functions that are annotated with @celery_app.task and unit test that method instead, that's fine too)

@alekseik1
Copy link
Contributor Author

@gibsondan so, here is a test. It may look cubersome (in fact it is), so your comments are welcome.
The thing is, no tests in dagster_celery_k8s launch celery-k8s executor - I found only launcher tests. So I had to write from scratch, with trial and error, so code quality might not be the best.
I suggest that you run them with debugger in case you'd like to check them thoroughly.

Copy link
Member

@gibsondan gibsondan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alekseik1 I added a commit that did a pass on the test. Can you rebase and run make pyright one more time?

@alekseik1 alekseik1 force-pushed the precise-k8s-customization-for-ops branch from 1180614 to 33ef70f Compare July 24, 2024 08:18
@alekseik1
Copy link
Contributor Author

Weird, I cannot reproduce a failing test on my local environment:

cd python_modules/libraries/dagster-celery-k8s
pyenv local 3.11
tox -vv -e py311

gives me passing tests. Maybe a simple rerun could do the trick?

@gibsondan gibsondan self-requested a review July 24, 2024 13:13
@alekseik1
Copy link
Contributor Author

@gibsondan should I fix anything or you'd like to take it from here?

@gibsondan
Copy link
Member

gibsondan commented Jul 24, 2024 via email

@gibsondan
Copy link
Member

Oh, I think it also needs one more "make ruff" after the most recent change

Copy link
Member

@gibsondan gibsondan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That did it! Thanks for powering through this.

@gibsondan gibsondan merged commit b754b20 into dagster-io:master Jul 24, 2024
1 check passed
@alekseik1 alekseik1 deleted the precise-k8s-customization-for-ops branch July 24, 2024 19:44
sryza pushed a commit that referenced this pull request Jul 24, 2024
## Summary & Motivation

As stated in #22138 , Dagster currently requires that op/job limits in
k8s are defined statically - and no dynamic overload is allowed.
I'd like to resolve this issue for `celery-k8s` launcher (we use them in
our production).
This PR tries to address the problem by introducing a special tag
`"dagster-k8s/config-per-op"` that allows to override
`user_defined_k8s_config`.
I expect this to work as follows:

```python
from dagster import op, job, schedule, RunRequest


@op
def op1():
    print(1 + 2)


# op defines it's own limits
@op(
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "1", "memory": "1Gi"},
                    "limits": {"cpu": "10", "memory": "10Gi"},
                }
            }
        }
    }
)
def op2():
    print(7)


# <------- one way to override op tags
@job(
    tags={
        "dagster-k8s/config-per-op": {
            "op2": {
                "container_config": {
                    # requests will be overwritten, limits will be taken from op definition
                    "resources": {
                        "requests": {"cpu": "2", "memory": "2Gi"},
                    }
                }
            },
            "op1": {
                "container_config": {
                    # op1 gets requests/limits even though it never defined them
                    "resources": {
                        "requests": {"cpu": "11", "memory": "11Gi"},
                        "limits": {"cpu": "22", "memory": "22Gi"},
                    }
                }
            },
        }
    }
)
def main_job():
    op1()
    op2()


# <------- another way to override op tags
@schedule(job=main_job, cron_schedule="* * * * *")
def main_job():
    return RunRequest(
        tags={
            "dagster-k8s/config-per-op": {
                "op2": {
                    "container_config": {
                        # requests will be overwritten at schedule level, limits will be taken from op definition
                        "resources": {
                            "requests": {"cpu": "8", "memory": "8Gi"},
                        }
                    }
                },
                "op1": {
                    "container_config": {
                        # op1 gets requests/limits from schedule definition
                        "resources": {
                            "requests": {"cpu": "88", "memory": "88Gi"},
                            "limits": {"cpu": "89", "memory": "89Gi"},
                        }
                    }
                },
            }
        }
    )
```

I could use this feature for jobs that vary in resource consumption (for
instance, jobs that accept sql query and download data from db).

Could you please take a look at the code and check it for broken corner
cases or violated contracts?
This is my first dive into Dagster codebase so I'll appreciate any
comments and suggestions.

## How I Tested These Changes
I modified some unit-tests in k8s-related modules in dagster. No
integration testing was done.

---------

Co-authored-by: gibsondan <[email protected]>
smackesey pushed a commit that referenced this pull request Jul 25, 2024
## Summary & Motivation

As stated in #22138 , Dagster currently requires that op/job limits in
k8s are defined statically - and no dynamic overload is allowed.
I'd like to resolve this issue for `celery-k8s` launcher (we use them in
our production).
This PR tries to address the problem by introducing a special tag
`"dagster-k8s/config-per-op"` that allows to override
`user_defined_k8s_config`.
I expect this to work as follows:

```python
from dagster import op, job, schedule, RunRequest


@op
def op1():
    print(1 + 2)


# op defines it's own limits
@op(
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "1", "memory": "1Gi"},
                    "limits": {"cpu": "10", "memory": "10Gi"},
                }
            }
        }
    }
)
def op2():
    print(7)


# <------- one way to override op tags
@job(
    tags={
        "dagster-k8s/config-per-op": {
            "op2": {
                "container_config": {
                    # requests will be overwritten, limits will be taken from op definition
                    "resources": {
                        "requests": {"cpu": "2", "memory": "2Gi"},
                    }
                }
            },
            "op1": {
                "container_config": {
                    # op1 gets requests/limits even though it never defined them
                    "resources": {
                        "requests": {"cpu": "11", "memory": "11Gi"},
                        "limits": {"cpu": "22", "memory": "22Gi"},
                    }
                }
            },
        }
    }
)
def main_job():
    op1()
    op2()


# <------- another way to override op tags
@schedule(job=main_job, cron_schedule="* * * * *")
def main_job():
    return RunRequest(
        tags={
            "dagster-k8s/config-per-op": {
                "op2": {
                    "container_config": {
                        # requests will be overwritten at schedule level, limits will be taken from op definition
                        "resources": {
                            "requests": {"cpu": "8", "memory": "8Gi"},
                        }
                    }
                },
                "op1": {
                    "container_config": {
                        # op1 gets requests/limits from schedule definition
                        "resources": {
                            "requests": {"cpu": "88", "memory": "88Gi"},
                            "limits": {"cpu": "89", "memory": "89Gi"},
                        }
                    }
                },
            }
        }
    )
```

I could use this feature for jobs that vary in resource consumption (for
instance, jobs that accept sql query and download data from db).

Could you please take a look at the code and check it for broken corner
cases or violated contracts?
This is my first dive into Dagster codebase so I'll appreciate any
comments and suggestions.

## How I Tested These Changes
I modified some unit-tests in k8s-related modules in dagster. No
integration testing was done.

---------

Co-authored-by: gibsondan <[email protected]>
gibsondan added a commit that referenced this pull request Jul 26, 2024
Summary:
Resolves #23268.

#23053 added a new argument to the celery task signature and passed it in, even when it was empty. Since the celery workers might be running older versions of dagster, a new version of dagster running the executor might set this field and hit the error in that task.

Attempt to resolve that in two directions:
- only set the new argument if it is non-empty
- Add kwargs to the task signature, to hopefully make it more resilient to new arguments in the future

Test Plan: BK, run celery with user code on master but the celery workers on 1.7.14, should no longer reproduce the linked error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants