Skip to content

Commit

Permalink
fixes and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Jul 7, 2022
1 parent 32c3446 commit 978da85
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
# Handle partitioned and sharded tables.
table_name: Optional[str] = None

# if table name ends in _* then we strip it as that represents a query on a sharded table
if self.table.endswith("_*"):
table_name = self.table[:-2]
logger.debug(
f"Found query on sharded table {self.table}. Using {table_name} as the table name."
)
return BigQueryTableRef(self.project, self.dataset, table_name)

matches = re.match(sharded_table_regex, self.table)
if matches:
table_name = matches.group(2)
Expand Down Expand Up @@ -322,7 +330,7 @@ def __str__(self) -> str:

def _table_ref_to_urn(ref: BigQueryTableRef, env: str) -> str:
return builder.make_dataset_urn(
"bigquery", f"{ref.project}.{ref.dataset}.{ref.table.rstrip('_*')}", env
"bigquery", f"{ref.project}.{ref.dataset}.{ref.table}", env
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from datahub.configuration.common import ConfigModel, ConfigurationError

_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX: str = "((.+)[_$])?(\\d{4,10})$"


class BigQueryBaseConfig(ConfigModel):
rate_limit: bool = pydantic.Field(
Expand All @@ -20,7 +22,7 @@ class BigQueryBaseConfig(ConfigModel):
)

sharded_table_pattern: str = pydantic.Field(
default="((.+)[_$])?(\\d{4,10})$",
default=_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
description="The regex pattern to match sharded tables and group as one table. This is a very low level config parameter, only change if you know what you are doing, ",
)

Expand Down
30 changes: 30 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_usage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.usage.bigquery_usage import (
BQ_AUDIT_V1,
BigQueryTableRef,
BigQueryUsageConfig,
BigQueryUsageSource,
)
from datahub.ingestion.source_config.bigquery import (
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
)

FROZEN_TIME = "2021-07-20 00:00:00"

Expand Down Expand Up @@ -164,3 +168,29 @@ def test_bigquery_filters_with_deny_filter():
source = BigQueryUsageSource.create(config, PipelineContext(run_id="bq-usage-test"))
filter: str = source._generate_filter(BQ_AUDIT_V1)
assert filter == expected_filter


def test_bigquery_ref_extra_removal():
table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_*")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2022")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_20222110")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset

0 comments on commit 978da85

Please sign in to comment.