Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental flag to attach row count metadata as part of dagster-dbt execution #21542

Merged
merged 6 commits into from
May 9, 2024

Conversation

benpankow
Copy link
Member

@benpankow benpankow commented Apr 30, 2024

Summary

Adds a new fetch_table_metadata experimental flag to DbtCliResource.cli which allows us to fetch dagster/total_row_count (introduced in #21524) to dbt-built tables:

@dbt_assets(manifest=dbt_manifest)
def jaffle_shop_dbt_assets(
    context: AssetExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(
        ["build"],
        context=context,
        fetch_table_metadata=True,
    ).stream()
Screenshot 2024-05-03 at 11 03 19 AM

Under the hood, this PR uses dbt's dbt.adapters.base.impl.BaseAdapter abstraction to let Dagster connect to the user's warehouse using the dbt-provided credentials. Right now, we just run a simple select count(*) on the tables specified in each AssetMaterialization and Output, but this lays some groundwork we could use for fetching other data as well.

There are a few caveats:

  • When using duckdb, we wait for the dbt run to conclude, since duckdb does not allow simultaneous connections when a write connection is open (e.g. when dbt is running)
  • We don't query row counts on views, since they may include non-trivial sql which could be expensive to query

Test Plan

Tested locally w/ duckdb, bigquery, and snowflake. Introduced basic pytest test to test against duckdb.

@benpankow benpankow changed the base branch from master to benpankow/row-count-meta May 1, 2024 20:17
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 8e18c51 to 4329f78 Compare May 1, 2024 20:17
@benpankow benpankow changed the title test [wip] Pull row count as part of dagster-dbt execution May 1, 2024
@benpankow benpankow changed the base branch from benpankow/row-count-meta to persist-run-for-production-2-schedule May 1, 2024 20:21
@benpankow benpankow changed the base branch from persist-run-for-production-2-schedule to benpankow/row-count-meta May 1, 2024 20:21
@benpankow benpankow requested review from rexledesma and sryza May 1, 2024 21:35
Copy link
Contributor

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main thing here is that I'm not sold on this PostprocessingTask abstraction. Let's just define a single stateful transformation function.

Comment on lines 1294 to 1260
from dbt.adapters.duckdb.credentials import DuckDBCredentials

if isinstance(config.credentials, DuckDBCredentials):
if not config.credentials.config_options:
config.credentials.config_options = {}
config.credentials.config_options["access_mode"] = "READ_ONLY"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what happening here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempts to get duckdb adapter to play nice - concurrently, duckdb only accepts either:
a) one read-write connection
b) many read-only connections

this lets us parallelize our accesses for postprocessing in the duckdb case - sadly don't think we can run any of these tasks simultaneously to the dbt build since that's read-write

@@ -282,3 +283,20 @@ def parse_manifest(path: str, target_path: str = DEFAULT_DBT_TARGET_PATH) -> Map
return json.load(file)
except FileNotFoundError:
raise DagsterDbtCliOutputsNotFoundError(path=manifest_path)


def get_future_completion_state_or_err(futures: List[Union[Future, Any]]) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead use https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait with some timeout for similar functionality?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, but I think we'd still have to do some custom logic around bookkeeping on event log/Future order. Right now the waiting/timeout is handled in the main loop in stream since we just want to wait on the next incomplete event log entry's Future (so we can yield it to user code). This way we don't have to worry about yielding things in the wrong order.

# they are emitted by dbt.
output_events_and_futures: List[Union[Future, DbtDagsterEventType]] = []

with pushd(str(self.project_dir)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the pushd here?

Copy link
Member Author

@benpankow benpankow May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to get the duckdb adapter to work correctly - it tries to access the db file assuming the working directory is the project dir - seems like the DBT cli does this in its process, but we need to do the same if we want to use the adapter ourselves. Added comment.

@@ -543,6 +613,7 @@ def run(
raise_on_error: bool,
context: Optional[OpExecutionContext],
adapter: Optional[BaseAdapter],
postprocessing_tasks: Optional[List[DbtPostprocessingAsyncTask]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so we have maximum flexibility, we shouldn't expose this API to the user yet. Once we have confidence in this scheme working for out-of-the-box metadata (e.g. row count), then we can consider exposing it to the user so they also have more flexibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. We still expose this functionality to the user, but now it's an @experimental_param flag

@benpankow benpankow force-pushed the benpankow/threadpool-thing branch 2 times, most recently from 4befca2 to 46ffcb3 Compare May 2, 2024 23:16
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 2a58b25 to 7b14290 Compare May 3, 2024 18:13
@benpankow benpankow marked this pull request as ready for review May 3, 2024 18:17
@benpankow benpankow changed the title [wip] Pull row count as part of dagster-dbt execution Experimental flag to attach row count metadata as part of dagster-dbt execution May 3, 2024
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch 2 times, most recently from a35aa1b to 0a44fe7 Compare May 3, 2024 21:29
@benpankow benpankow requested a review from rexledesma May 3, 2024 21:40
Copy link
Contributor

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main comments:

  • Divest from the boolean pattern, to more of a composable builder pattern
  • The with_metadata change should be a separate PR. There should be systematic refactoring of our event inheritance to ensure metadata can be overridden in an ergonomic way.

@@ -149,6 +149,17 @@ def __eq__(self, other: object) -> bool:
and self.tags == other.tags
)

def with_metadata(self, metadata: Optional[Mapping[str, RawMetadataValue]]) -> "Output":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should separate this out into its own PR, since this is a core lib change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +714 to +718
if isinstance(event, Output):
return event.with_metadata(metadata={**event.metadata, **additional_metadata})
else:
return event._replace(metadata={**event.metadata, **additional_metadata})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signals that the core Dagster XXXEvent classes should have a more unified inheritance pattern.

We should file an issue and mark this as one code path to consolidate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all_work_complete and event_to_emit_idx >= len(output_events_and_futures):
break

if event_to_emit_idx < len(output_events_and_futures):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: invert conditional logic, there's a lot of nesting here

@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from ff9b5e9 to 9a27b13 Compare May 6, 2024 18:54
@benpankow benpankow changed the base branch from benpankow/row-count-meta to benpankow/output-with-metadata May 6, 2024 18:57
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 9a27b13 to 596b0bf Compare May 6, 2024 18:57
@benpankow
Copy link
Member Author

Updated to builder pattern

@benpankow benpankow requested a review from rexledesma May 7, 2024 17:21
Copy link
Contributor

@rexledesma rexledesma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume docs + API docstring examples are coming in a follow up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be in the dbt_packages/ directory since this functionality doesn't rely on the dagster dbt package.

@benpankow benpankow force-pushed the benpankow/output-with-metadata branch from ac37a1b to 58bcadb Compare May 7, 2024 20:29
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 4d12f63 to 437bdc3 Compare May 7, 2024 20:29
@benpankow benpankow force-pushed the benpankow/output-with-metadata branch from 58bcadb to b9d8a75 Compare May 7, 2024 22:11
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 437bdc3 to ee88d8c Compare May 7, 2024 22:11
@benpankow benpankow force-pushed the benpankow/output-with-metadata branch from b9d8a75 to 5e94d59 Compare May 8, 2024 23:17
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from d616ef9 to 46998df Compare May 8, 2024 23:18
Base automatically changed from benpankow/output-with-metadata to master May 8, 2024 23:44
benpankow added a commit that referenced this pull request May 8, 2024
## Summary

Adds a small util method to recreate an `Output` object with a new set
of metadata keys and values. Used in #21542.

## Test Plan

New unit test.
@benpankow benpankow force-pushed the benpankow/threadpool-thing branch from 46998df to 9b3a0e2 Compare May 9, 2024 17:37
@benpankow benpankow merged commit 536685d into master May 9, 2024
1 check was pending
@benpankow benpankow deleted the benpankow/threadpool-thing branch May 9, 2024 20:15
danielgafni pushed a commit to danielgafni/dagster that referenced this pull request Jun 18, 2024
## Summary

Adds a small util method to recreate an `Output` object with a new set
of metadata keys and values. Used in dagster-io#21542.

## Test Plan

New unit test.
danielgafni pushed a commit to danielgafni/dagster that referenced this pull request Jun 18, 2024
…t` execution (dagster-io#21542)

## Summary

Adds a new `fetch_table_metadata` experimental flag to
`DbtCliResource.cli` which allows us to fetch `dagster/total_row_count`
(introduced in dagster-io#21524) to dbt-built tables:

```python
@dbt_assets(manifest=dbt_manifest)
def jaffle_shop_dbt_assets(
    context: AssetExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(
        ["build"],
        context=context,
        fetch_table_metadata=True,
    ).stream()
```

<img width="534" alt="Screenshot 2024-05-03 at 11 03 19 AM"
src="https://github.com/dagster-io/dagster/assets/10215173/c3e64633-5fc3-44e4-99e3-601f0c7a0856">

Under the hood, this PR uses dbt's `dbt.adapters.base.impl.BaseAdapter`
abstraction to let Dagster connect to the user's warehouse using the
dbt-provided credentials. Right now, we just run a simple `select
count(*)` on the tables specified in each `AssetMaterialization` and
`Output`, but this lays some groundwork we could use for fetching other
data as well.

There are a few caveats:
- When using duckdb, we wait for the dbt run to conclude, since duckdb
does not allow simultaneous connections when a write connection is open
(e.g. when dbt is running)
- We don't query row counts on views, since they may include non-trivial
sql which could be expensive to query

## Test Plan

Tested locally w/ duckdb, bigquery, and snowflake. Introduced basic
pytest test to test against duckdb.
gibsondan added a commit that referenced this pull request Sep 10, 2024
…n't require an import of a module that might not be present

Summary:
Attempt to resolve #23952: Instead of attempting to import duckdb and logging a warning on importerror, do a bit of introspection on the relevant class and only try to import it if the class name matches.

Test Plan: could use some help with this from reviewers, I don't see any duckdb-specific tests in #21542
gibsondan added a commit that referenced this pull request Sep 10, 2024
…n't require an import of a module that might not be present (#24346)

Summary:
Attempt to resolve #23952:
Instead of attempting to import duckdb and logging a warning on
importerror, do a bit of introspection on the relevant class and only
try to import it if the class name matches.

Test Plan: could use some help with this from reviewers, I don't see any
duckdb-specific tests in
#21542

## Summary & Motivation

## How I Tested These Changes

## Changelog

Insert changelog entry or "NOCHANGELOG" here.

- [ ] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants