Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/bigquery): support column-level lineage #8382

Merged
merged 16 commits into from
Jul 11, 2023
Prev Previous commit
Next Next commit
make view storage more efficient
  • Loading branch information
hsheth2 committed Jul 10, 2023
commit cd2015e5559dcab241eb27e91112bbbd707fb697
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ def parse_sql_lineage(
default_schema=default_schema,
)

def close(self) -> None:
self._make_schema_resolver.cache_clear()
super().close()


def get_default_graph() -> DataHubGraph:
(url, token) = get_url_and_token()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import hashlib
import logging
import os
import re
Expand Down Expand Up @@ -112,6 +113,7 @@
TagAssociationClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.hive_schema_to_avro import (
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
Expand All @@ -138,6 +140,10 @@ def cleanup(config: BigQueryV2Config) -> None:
os.unlink(config._credentials_path)


def _generate_sql_id(sql: str) -> str:
return hashlib.md5(sql.encode("utf-8")).hexdigest()


@platform_name("BigQuery", doc_order=1)
@config_class(BigQueryV2Config)
@support_status(SupportStatus.CERTIFIED)
Expand Down Expand Up @@ -254,8 +260,12 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):

# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()
# Maps project -> view_ref -> view definition (will be used when generating lineage)
self.view_definitions: Dict[str, Dict[str, str]] = defaultdict(dict)

# We do this so that the SQL is stored in a file-backed dict, but the sql IDs are stored in memory.
# Maps project -> view_ref -> sql ID (will be used when generating lineage)
self.view_definition_ids: Dict[str, Dict[str, str]] = defaultdict(dict)
# Maps sql ID -> actual sql
self.view_definitions: FileBackedDict[str] = FileBackedDict()
Comment on lines +265 to +268
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do view_definitions: view ref -> sql and then a map project_id -> view ref instead to avoid the hashing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense - will do in a follow up PR


self.sql_parser_schema_resolver = SchemaResolver(
platform=self.platform, env=self.config.env
Expand Down Expand Up @@ -666,7 +676,10 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
)

if self.config.lineage_parse_view_ddl:
for view, view_definition in self.view_definitions[project_id].items():
for view, view_definition_id in self.view_definition_ids[
project_id
].items():
view_definition = self.view_definitions[view_definition_id]
raw_view_lineage = sqlglot_lineage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we think this sql parser is strictly better than the old one? A fallback to the old one might be safer, but honestly I would like to err on the side of code velocity over safety (in cases like these) so I am fine with this as is

EDIT: That being said, a function like below might be nice, in case we support multiple parsers or adjust the call signature of sqlglot_lineage:

def parse_lineage(self, query: str, project_id: str):
    return sqlglot_lineage(query, self.platform, self.sql_parser_schema_resolver, project_id)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially wanted to do that

However the sql_parser_schema_resolver is only available in the main BigquerySource, and not in the BigqueryLineage class.

Ideally we create a BigqueryContext object with the schema resolver, urn generation utils, etc and pass that around everywhere

view_definition,
platform=self.platform,
Expand Down Expand Up @@ -871,7 +884,9 @@ def _process_view(
)
self.table_refs.add(table_ref)
if self.config.lineage_parse_view_ddl:
self.view_definitions[project_id][table_ref] = view.view_definition
view_definition_id = _generate_sql_id(view.view_definition)
self.view_definition_ids[project_id][table_ref] = view_definition_id
self.view_definitions[view_definition_id] = view.view_definition

view.column_count = len(columns)
if not view.column_count:
Expand Down