Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customize UserDefinedDagsterK8sConfig per op via tags #23053

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: more complex case + docs
  • Loading branch information
alekseik1 committed Jul 24, 2024
commit be36bd744c1f3acf460501d30ef8e2e568ea189a
49 changes: 44 additions & 5 deletions example_project/definitions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
"""Example of container_config overriding logic.

1. celery_k8s_job_executor is most important, it precedes everything else. Is specified, `run_config` from RunRequest is ignored.
alekseik1 marked this conversation as resolved.
Show resolved Hide resolved
Precedence order:
- at job-s definition (via executor_def=...)
- at Definitions (via executor=...)
2. after it goes run_config from request limit in schedule. If celery_k8s_job_executor is configured (via Definitions or via job), RunRequest config is ignored.
3. Then goes tag "dagster-k8s/config" from op.
"""

from dagster_celery_k8s import celery_k8s_job_executor
from dagster import Definitions, in_process_executor, op, job, schedule, RunRequest

Expand Down Expand Up @@ -34,7 +44,30 @@ def op2():
print(1)


@job
@job(
executor_def=celery_k8s_job_executor.configured(
{
"per_step_k8s_config": {
"op1": {
"container_config": {
"resources": {
"requests": {"cpu": "999m", "memory": "999Mi"},
"limits": {"cpu": "999m", "memory": "999Mi"},
}
}
},
"op2": {
"container_config": {
"resources": {
"requests": {"cpu": "1111m", "memory": "1111Mi"},
"limits": {"cpu": "1111m", "memory": "1111Mi"},
}
}
},
}
}
)
)
def job1():
op1()
op2()
Expand All @@ -54,7 +87,15 @@ def schedule1():
"limits": {"cpu": "888m", "memory": "888Mi"},
}
}
}
},
"op2": {
"container_config": {
"resources": {
"requests": {"cpu": "677m", "memory": "677Mi"},
"limits": {"cpu": "677m", "memory": "677Mi"},
}
}
},
}
}
}
Expand Down Expand Up @@ -87,7 +128,5 @@ def schedule1():
},
}
}
)
if True
else in_process_executor,
),
)