-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experimental flag to attach row count metadata as part of dagster-dbt
execution
#21542
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @benpankow and the rest of your teammates on Graphite |
8e18c51
to
4329f78
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main thing here is that I'm not sold on this PostprocessingTask abstraction. Let's just define a single stateful transformation function.
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
from dbt.adapters.duckdb.credentials import DuckDBCredentials | ||
|
||
if isinstance(config.credentials, DuckDBCredentials): | ||
if not config.credentials.config_options: | ||
config.credentials.config_options = {} | ||
config.credentials.config_options["access_mode"] = "READ_ONLY" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attempts to get duckdb adapter to play nice - concurrently, duckdb only accepts either:
a) one read-write connection
b) many read-only connections
this lets us parallelize our accesses for postprocessing in the duckdb case - sadly don't think we can run any of these tasks simultaneously to the dbt build since that's read-write
@@ -282,3 +283,20 @@ def parse_manifest(path: str, target_path: str = DEFAULT_DBT_TARGET_PATH) -> Map | |||
return json.load(file) | |||
except FileNotFoundError: | |||
raise DagsterDbtCliOutputsNotFoundError(path=manifest_path) | |||
|
|||
|
|||
def get_future_completion_state_or_err(futures: List[Union[Future, Any]]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead use https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait with some timeout for similar functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, but I think we'd still have to do some custom logic around bookkeeping on event log/Future order. Right now the waiting/timeout is handled in the main loop in stream
since we just want to wait on the next incomplete event log entry's Future
(so we can yield it to user code). This way we don't have to worry about yielding things in the wrong order.
# they are emitted by dbt. | ||
output_events_and_futures: List[Union[Future, DbtDagsterEventType]] = [] | ||
|
||
with pushd(str(self.project_dir)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the pushd
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed to get the duckdb adapter to work correctly - it tries to access the db file assuming the working directory is the project dir - seems like the DBT cli does this in its process, but we need to do the same if we want to use the adapter ourselves. Added comment.
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
@@ -543,6 +613,7 @@ def run( | |||
raise_on_error: bool, | |||
context: Optional[OpExecutionContext], | |||
adapter: Optional[BaseAdapter], | |||
postprocessing_tasks: Optional[List[DbtPostprocessingAsyncTask]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so we have maximum flexibility, we shouldn't expose this API to the user yet. Once we have confidence in this scheme working for out-of-the-box metadata (e.g. row count), then we can consider exposing it to the user so they also have more flexibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. We still expose this functionality to the user, but now it's an @experimental_param
flag
563fccc
to
13fd45e
Compare
4befca2
to
46ffcb3
Compare
1b5c0c7
to
375cf8b
Compare
2a58b25
to
7b14290
Compare
dagster-dbt
execution
375cf8b
to
c3ae946
Compare
a35aa1b
to
0a44fe7
Compare
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main comments:
- Divest from the boolean pattern, to more of a composable builder pattern
- The
with_metadata
change should be a separate PR. There should be systematic refactoring of our event inheritance to ensure metadata can be overridden in an ergonomic way.
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
@@ -149,6 +149,17 @@ def __eq__(self, other: object) -> bool: | |||
and self.tags == other.tags | |||
) | |||
|
|||
def with_metadata(self, metadata: Optional[Mapping[str, RawMetadataValue]]) -> "Output": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should separate this out into its own PR, since this is a core lib change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
if isinstance(event, Output): | ||
return event.with_metadata(metadata={**event.metadata, **additional_metadata}) | ||
else: | ||
return event._replace(metadata={**event.metadata, **additional_metadata}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This signals that the core Dagster XXXEvent
classes should have a more unified inheritance pattern.
We should file an issue and mark this as one code path to consolidate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if all_work_complete and event_to_emit_idx >= len(output_events_and_futures): | ||
break | ||
|
||
if event_to_emit_idx < len(output_events_and_futures): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: invert conditional logic, there's a lot of nesting here
d70aa25
to
9d957ab
Compare
ff9b5e9
to
9a27b13
Compare
9a27b13
to
596b0bf
Compare
Updated to builder pattern |
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume docs + API docstring examples are coming in a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be in the dbt_packages/
directory since this functionality doesn't rely on the dagster
dbt package.
ac37a1b
to
58bcadb
Compare
4d12f63
to
437bdc3
Compare
58bcadb
to
b9d8a75
Compare
437bdc3
to
ee88d8c
Compare
b9d8a75
to
5e94d59
Compare
d616ef9
to
46998df
Compare
## Summary Adds a small util method to recreate an `Output` object with a new set of metadata keys and values. Used in #21542. ## Test Plan New unit test.
46998df
to
9b3a0e2
Compare
## Summary Adds a small util method to recreate an `Output` object with a new set of metadata keys and values. Used in dagster-io#21542. ## Test Plan New unit test.
…t` execution (dagster-io#21542) ## Summary Adds a new `fetch_table_metadata` experimental flag to `DbtCliResource.cli` which allows us to fetch `dagster/total_row_count` (introduced in dagster-io#21524) to dbt-built tables: ```python @dbt_assets(manifest=dbt_manifest) def jaffle_shop_dbt_assets( context: AssetExecutionContext, dbt: DbtCliResource, ): yield from dbt.cli( ["build"], context=context, fetch_table_metadata=True, ).stream() ``` <img width="534" alt="Screenshot 2024-05-03 at 11 03 19 AM" src="https://github.com/dagster-io/dagster/assets/10215173/c3e64633-5fc3-44e4-99e3-601f0c7a0856"> Under the hood, this PR uses dbt's `dbt.adapters.base.impl.BaseAdapter` abstraction to let Dagster connect to the user's warehouse using the dbt-provided credentials. Right now, we just run a simple `select count(*)` on the tables specified in each `AssetMaterialization` and `Output`, but this lays some groundwork we could use for fetching other data as well. There are a few caveats: - When using duckdb, we wait for the dbt run to conclude, since duckdb does not allow simultaneous connections when a write connection is open (e.g. when dbt is running) - We don't query row counts on views, since they may include non-trivial sql which could be expensive to query ## Test Plan Tested locally w/ duckdb, bigquery, and snowflake. Introduced basic pytest test to test against duckdb.
…n't require an import of a module that might not be present Summary: Attempt to resolve #23952: Instead of attempting to import duckdb and logging a warning on importerror, do a bit of introspection on the relevant class and only try to import it if the class name matches. Test Plan: could use some help with this from reviewers, I don't see any duckdb-specific tests in #21542
…n't require an import of a module that might not be present (#24346) Summary: Attempt to resolve #23952: Instead of attempting to import duckdb and logging a warning on importerror, do a bit of introspection on the relevant class and only try to import it if the class name matches. Test Plan: could use some help with this from reviewers, I don't see any duckdb-specific tests in #21542 ## Summary & Motivation ## How I Tested These Changes ## Changelog Insert changelog entry or "NOCHANGELOG" here. - [ ] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_
Summary
Adds a new
fetch_table_metadata
experimental flag toDbtCliResource.cli
which allows us to fetchdagster/total_row_count
(introduced in #21524) to dbt-built tables:Under the hood, this PR uses dbt's
dbt.adapters.base.impl.BaseAdapter
abstraction to let Dagster connect to the user's warehouse using the dbt-provided credentials. Right now, we just run a simpleselect count(*)
on the tables specified in eachAssetMaterialization
andOutput
, but this lays some groundwork we could use for fetching other data as well.There are a few caveats:
Test Plan
Tested locally w/ duckdb, bigquery, and snowflake. Introduced basic pytest test to test against duckdb.