Skip to content

Commit

Permalink
[dagster-dbt] add with_snowflake/bigquery_insights chainable method (
Browse files Browse the repository at this point in the history
…dagster-io#23183)

## Summary

Adds a `with_snowflake_insights` and `with_bigquery_insights` methods
that can be chained to a `.stream()` call, simplifying using insights
alongside other deferred fetches:

Before:

```python
@dbt_assets(manifest=snowflake_manifest_path)
def jaffle_shop_dbt_assets(
    context: AssetExecutionContext,
    dbt: DbtCliResource,
):
    dbt_cli_invocation = dbt.cli(["build", "--profile", "unittest"], context=context)
    yield from dbt_with_snowflake_insights(context, dbt_cli_invocation)

```


Now:

```python
@dbt_assets(manifest=snowflake_manifest_path)
def jaffle_shop_dbt_assets(
    context: AssetExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(
        ["build", "--profile", "unittest"], context=context
    ).stream().with_snowflake_insights()
```

## Test Plan

dagster-io/internal#10731
  • Loading branch information
benpankow committed Jul 24, 2024
1 parent e298409 commit 9fbdd2f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
81 changes: 81 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,87 @@ def _threadpool_wrap_map_fn() -> (
dbt_cli_invocation=self._dbt_cli_invocation,
)

@public
@experimental
def with_insights(
self,
skip_config_check: bool = False,
record_observation_usage: bool = True,
) -> (
"DbtEventIterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]"
):
"""Associate each warehouse query with the produced asset materializations for use in Dagster
Plus Insights. Currently supports Snowflake and BigQuery.
For more information, see the documentation for
`dagster_cloud.dagster_insights.dbt_with_snowflake_insights` and
`dagster_cloud.dagster_insights.dbt_with_bigquery_insights`.
Args:
skip_config_check (bool): If true, skips the check that the dbt project config is set up
correctly. Defaults to False.
record_observation_usage (bool): If True, associates the usage associated with
asset observations with that asset. Default is True.
**Example:**
.. code-block:: python
@dbt_assets(manifest=DBT_MANIFEST_PATH)
def jaffle_shop_dbt_assets(
context: AssetExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["build"], context=context).stream().with_insights()
"""
adapter_type = self._dbt_cli_invocation.manifest.get("metadata", {}).get("adapter_type")
if adapter_type == "snowflake":
try:
from dagster_cloud.dagster_insights import ( # pyright: ignore[reportMissingImports]
dbt_with_snowflake_insights,
)
except ImportError as e:
raise DagsterInvalidPropertyError(
"The `dagster_cloud` library is required to use the `with_insights`"
" method. Install the library with `pip install dagster-cloud`."
) from e

return DbtEventIterator(
events=dbt_with_snowflake_insights(
context=self._dbt_cli_invocation.context,
dbt_cli_invocation=self._dbt_cli_invocation,
dagster_events=self,
skip_config_check=skip_config_check,
record_observation_usage=record_observation_usage,
),
dbt_cli_invocation=self._dbt_cli_invocation,
)
elif adapter_type == "bigquery":
try:
from dagster_cloud.dagster_insights import ( # pyright: ignore[reportMissingImports]
dbt_with_bigquery_insights,
)
except ImportError as e:
raise DagsterInvalidPropertyError(
"The `dagster_cloud` library is required to use the `with_insights`"
" method. Install the library with `pip install dagster-cloud`."
) from e

return DbtEventIterator(
events=dbt_with_bigquery_insights(
context=self._dbt_cli_invocation.context,
dbt_cli_invocation=self._dbt_cli_invocation,
dagster_events=self,
skip_config_check=skip_config_check,
record_observation_usage=record_observation_usage,
),
dbt_cli_invocation=self._dbt_cli_invocation,
)
else:
check.failed(
f"The `with_insights` method is only supported for Snowflake and BigQuery and is not supported for adapter type `{adapter_type}`"
)


def _dbt_packages_has_dagster_dbt(packages_file: Path) -> bool:
"""Checks whether any package in the passed yaml file is the Dagster dbt package."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_check as check,
materialize,
)
from dagster._check.functions import CheckError
from dagster._core.definitions.events import AssetMaterialization, Output
from dagster._core.definitions.metadata.metadata_value import MetadataValue, TableMetadataValue
from dagster._core.definitions.metadata.table import TableRecord
Expand Down Expand Up @@ -140,6 +141,23 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
), str(metadata_by_asset_key)


def test_insights_err_not_snowflake_or_bq(
test_jaffle_shop_manifest_standalone_duckdb_dbfile: Dict[str, Any],
caplog: pytest.LogCaptureFixture,
) -> None:
@dbt_assets(manifest=test_jaffle_shop_manifest_standalone_duckdb_dbfile)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream().with_insights()

with pytest.raises(CheckError) as exc_info:
materialize(
[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path))},
)

assert "is not supported for adapter type `duckdb`" in str(exc_info.value)


@pytest.mark.parametrize(
"target, manifest_fixture_name",
[
Expand Down

0 comments on commit 9fbdd2f

Please sign in to comment.