Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change: Changed run tag filtering and added additional run tag filter for better performance. #22833

Merged
merged 4 commits into from
Jul 24, 2024

Conversation

egordm
Copy link
Contributor

@egordm egordm commented Jul 3, 2024

Summary & Motivation

While running jobs on frequent schedules we have noticed that as the amount of runs grows some ui operations become very slow. Looking at AWS monitoring we see that one query in particular seems to be very slow:
Screenshot_20240703_151316

By analyzing the query we have noticed that it is related to run tag filtering. Here is an example of such a query with filled parameters:

EXPLAIN (ANALYSE, BUFFERS) SELECT runs.id,
       runs.run_body,
       runs.status,
       runs.create_timestamp,
       runs.update_timestamp,
       runs.start_time,
       runs.end_time
FROM runs
WHERE runs.run_id IN (SELECT run_tags.run_id
                      FROM run_tags
                      WHERE
                          run_tags.key = 'dagster/schedule_name' AND run_tags.value = 'quick_partitioned_job_schedule'
                         OR run_tags.key = '.dagster/repository' AND
                            run_tags.value = '__repository__@example-code'
                      GROUP BY run_tags.run_id
                      HAVING count(DISTINCT run_tags.key) = 2)
ORDER BY runs.id DESC
LIMIT 1;

I believe there are multiple instances discussing this:

Looking at the query plan: https://explain.dalibo.com/plan/2c1bga585e8ca45f
image

We notice that the subquery scans a lot of rows (which is correct as we have a lot of runs with same tags), but afterwards, the filter on runs is very slow and filters away a lot of rows. A lot of work is done to retrieve only one row with highest matching run id which feels like it can be much more efficient.

To improve performance of these type of queries I would like to propose two changes:

  1. Replace the subquery by multiple joins. I would expect that this would make a much flatter execution plan and thus a potential for earlier filtering.

The example query would result in something like this:

EXPLAIN (ANALYSE, BUFFERS)
SELECT runs.id,
       runs.run_body,
       runs.status,
       runs.create_timestamp,
       runs.update_timestamp,
       runs.start_time,
       runs.end_time
FROM runs
JOIN public.run_tags r on runs.run_id = r.run_id AND r.key = 'dagster/schedule_name' AND r.value = 'quick_partitioned_job_schedule'
JOIN public.run_tags r2 on runs.run_id = r2.run_id AND r2.key = '.dagster/repository' AND r2.value = '__repository__@example-code'
ORDER BY runs.id DESC
LIMIT 1;
  1. As mentioned in one of the referenced threads, add an index on run_id for run tags. This would make joins in (1) much faster.
CREATE UNIQUE INDEX run_tags_run_idx ON public.run_tags USING btree (run_id, id);

The changes in the PR implement both changes in Dagster.

How I Tested These Changes

I have tested these change by first running tests to make sure they don't break dagster. Then I have set up a local benchmark to test the changes. I have populated the dagster instace with 5.4 million runs and 10.7 million related run tags.

Afterwards I have applied the proposed changes and measured their performance. Each query was run five times and the performance of the fifth run was used. This is, to make sure all the data was in shared buffers to make the comparison fair.

Results

Experiment Query Plan Analysis Runtime
Baseline 10.7 run tags, unoptimized - explain.dalibo.com 15.579s
Query Optimization 10.7 run tags, optimized query, no index - explain.dalibo.com 7.560s
Query Optimization + Custom Index 10.7 run tags, optimized query, with index - explain.dalibo.com 0.076s

Interestingly "Query Optimization" alone results in a more complex but faster query plan. The "Query Optimization + Custom Index" results in a desired much simpler query plan that doesn't do as many reads.

Overall the changes improve the performance almost 200x. The addition of the index shouldn't provide much overhead.

I have also found that if run_tags would use runs.id as foreign key and not runs.run_id the query performance would be much faster 0.018s. But this change would be too large and possibly break things.

@gibsondan gibsondan requested a review from prha July 3, 2024 19:17
@prha
Copy link
Member

prha commented Jul 9, 2024

Yeah, we've known for a little while now that we need to join on runs.id... We've punted on it because of the complexity involved in managing the opt-in migrations. But I think it is probably worth doing because of how the performance degrades over time.

I think this PR makes sense, but I might spend some time this week scoping out what the foreign-key change would look like (involving both a schema and data migration).

This change probably makes sense only if they've created the index. That tells me that this should probably be two separate PRs, one which creates the schema change with the additional index, and another change that switches the join in the query, which we'd want to coordinate with a major release (e.g. 1.8.0).

@@ -148,6 +148,7 @@
)

db.Index("idx_run_tags", RunTagsTable.c.key, RunTagsTable.c.value, mysql_length=64)
db.Index("idx_run_tags_run_idx", RunTagsTable.c.run_id, RunTagsTable.c.id, mysql_length=64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be accompanied with an alembic migration to add the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. I have added migrations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be:

db.Index("idx_run_tags_run_idx", RunTagsTable.c.run_id, RunTagsTable.c.id, mysql_length={"run_id": 255})

@egordm
Copy link
Contributor Author

egordm commented Jul 11, 2024

Sounds like a good plan. I will split this PR into two, to start with that.

Combining the new query with a change in foreign key in the future would be the best.

@egordm
Copy link
Contributor Author

egordm commented Jul 17, 2024

Hey, I have changed this PR to only add an index and a migration. As described I have also created a second PR #23050 which changes the query itself. Let me know if any other changes need to be made.

@egordm egordm requested a review from prha July 17, 2024 12:21
@@ -148,6 +148,7 @@
)

db.Index("idx_run_tags", RunTagsTable.c.key, RunTagsTable.c.value, mysql_length=64)
db.Index("idx_run_tags_run_idx", RunTagsTable.c.run_id, RunTagsTable.c.id, mysql_length=64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be:

db.Index("idx_run_tags_run_idx", RunTagsTable.c.run_id, RunTagsTable.c.id, mysql_length={"run_id": 255})

unique=False,
postgresql_concurrently=True,
mysql_length={
"pipeline_name": 512,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"run_id": 255

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I completely missed that one. Looking into it, it makes sense. I have applied the suggested changes.

@prha
Copy link
Member

prha commented Jul 23, 2024

@egordm can you run make ruff to fix the lint/format tests? After that, I think this is good to merge!

Signed-off-by: Egor Dmitriev <[email protected]>
@egordm
Copy link
Contributor Author

egordm commented Jul 24, 2024

Awesome. The lint errors should be fixed. Thanks!

@prha prha merged commit 072f762 into dagster-io:master Jul 24, 2024
1 check passed
prha pushed a commit that referenced this pull request Jul 24, 2024
A followup to #22833 adding an optimized query for tag filtering.

## Summary & Motivation
While running jobs on frequent schedules we have noticed that as the
amount of runs grows some ui operations become very slow. Looking at AWS
monitoring we see that one query in particular seems to be very slow:

![Screenshot_20240703_151316](https://github.com/dagster-io/dagster/assets/4254771/23040062-030b-4161-a43a-8667cbef4d56)

By analyzing the query we have noticed that it is related to run tag
filtering. Here is an example of such a query with filled parameters:
```sql
EXPLAIN (ANALYSE, BUFFERS) SELECT runs.id,
       runs.run_body,
       runs.status,
       runs.create_timestamp,
       runs.update_timestamp,
       runs.start_time,
       runs.end_time
FROM runs
WHERE runs.run_id IN (SELECT run_tags.run_id
                      FROM run_tags
                      WHERE
                          run_tags.key = 'dagster/schedule_name' AND run_tags.value = 'quick_partitioned_job_schedule'
                         OR run_tags.key = '.dagster/repository' AND
                            run_tags.value = '__repository__@example-code'
                      GROUP BY run_tags.run_id
                      HAVING count(DISTINCT run_tags.key) = 2)
ORDER BY runs.id DESC
LIMIT 1;
```

I believe there are multiple instances discussing this:
* #18269
* #19003

Looking at the query plan:
https://explain.dalibo.com/plan/2c1bga585e8ca45f

![image](https://github.com/dagster-io/dagster/assets/4254771/0d8fc125-ac67-4a1b-8f37-7ff69cbfa81f)

We notice that the subquery scans a lot of rows (which is correct as we
have a lot of runs with same tags), but afterwards, the filter on runs
is very slow and filters away a lot of rows. A lot of work is done to
retrieve only one row with highest matching run id which feels like it
can be much more efficient.

To improve performance of these type of queries I would like to propose
two changes:

1. Replace the subquery by multiple joins. I would expect that this
would make a much flatter execution plan and thus a potential for
earlier filtering.

The example query would result in something like this:
```sql
EXPLAIN (ANALYSE, BUFFERS)
SELECT runs.id,
       runs.run_body,
       runs.status,
       runs.create_timestamp,
       runs.update_timestamp,
       runs.start_time,
       runs.end_time
FROM runs
JOIN public.run_tags r on runs.run_id = r.run_id AND r.key = 'dagster/schedule_name' AND r.value = 'quick_partitioned_job_schedule'
JOIN public.run_tags r2 on runs.run_id = r2.run_id AND r2.key = '.dagster/repository' AND r2.value = '__repository__@example-code'
ORDER BY runs.id DESC
LIMIT 1;
```

2. As mentioned in one of the referenced threads, add an index on run_id
for run tags. This would make joins in (1) much faster.

```sql
CREATE UNIQUE INDEX run_tags_run_idx ON public.run_tags USING btree (run_id, id);
```

The changes in the PR implement both changes in Dagster.

## How I Tested These Changes

I have tested these change by first running tests to make sure they
don't break dagster. Then I have set up a local benchmark to test the
changes. I have populated the dagster instace with 5.4 million runs and
10.7 million related run tags.

Afterwards I have applied the proposed changes and measured their
performance. Each query was run five times and the performance of the
fifth run was used. This is, to make sure all the data was in shared
buffers to make the comparison fair.

Results
| Experiment | Query Plan Analysis | Runtime | 
| ----------------- | ----------------------------- | ------------ | 
| Baseline | [10.7 run tags, unoptimized -
explain.dalibo.com](https://explain.dalibo.com/plan/2c1bga585e8ca45f) |
15.579s |
| Query Optimization | [10.7 run tags, optimized query, no index -
explain.dalibo.com](https://explain.dalibo.com/plan/agf3fabc77b5ce58) |
7.560s |
| Query Optimization + Custom Index | [10.7 run tags, optimized query,
with index -
explain.dalibo.com](https://explain.dalibo.com/plan/dgf924f51g585ab5) |
0.076s |

Interestingly "Query Optimization" alone results in a more complex but
faster query plan. The "Query Optimization + Custom Index" results in a
desired much simpler query plan that doesn't do as many reads.

Overall the changes improve the performance almost 200x. The addition of
the index shouldn't provide much overhead.

I have also found that if run_tags would use `runs.id` as foreign key
and not `runs.run_id` the query performance would be much faster 0.018s.
But this change would be too large and possibly break things.

---------

Signed-off-by: Egor Dmitriev <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants