Skip to content

Commit

Permalink
Static filter (#373)
Browse files Browse the repository at this point in the history
Static_filters support

---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm committed May 2, 2024
1 parent f9ebfaf commit 98c5766
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
7 changes: 7 additions & 0 deletions numalogic/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ class Pivot:
value: list[str] = field(default_factory=lambda: ["count"])


@dataclass
class FilterConf:
inclusion_filters: Optional[list[dict]] = None
exclusion_filters: Optional[list[dict]] = None


@dataclass
class DruidFetcherConf:
datasource: str
static_filters: Optional[FilterConf] = None
dimensions: list[str] = field(default_factory=list)
aggregations: dict = field(default_factory=dict)
group_by: list[str] = field(default_factory=list)
Expand Down
35 changes: 31 additions & 4 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydruid.utils.filters import Filter

from numalogic.connectors._base import DataFetcher
from numalogic.connectors._config import Pivot
from numalogic.connectors._config import Pivot, FilterConf
from typing import Optional, Final

from numalogic.tools.exceptions import DruidFetcherError
Expand All @@ -34,13 +34,30 @@ def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[
return dict(zip(filter_keys, filter_values))


def _combine_in_filters(filters_list) -> Filter:
return Filter(type="and", fields=[Filter(**item) for item in filters_list])


def _combine_ex_filters(filters_list) -> Filter:
filters = _combine_in_filters(filters_list)
return Filter(type="not", field=filters)


def _make_static_filters(filters: FilterConf) -> Filter:
in_filters, ex_filters = _combine_in_filters(filters.inclusion_filters), _combine_ex_filters(
filters.exclusion_filters
)
return Filter(type="and", fields=[in_filters, ex_filters])


def build_params(
datasource: str,
dimensions: list[str],
filter_pairs: dict,
granularity: str,
hours: float,
delay: float,
static_filters: Optional[FilterConf] = None,
aggregations: Optional[list[str]] = None,
post_aggregations: Optional[list[str]] = None,
reference_dt: Optional[datetime] = None,
Expand All @@ -52,6 +69,7 @@ def build_params(
dimensions: The dimensions to group by
filter_pairs: Indicates which rows of
data to include in the query
static_filters: Static filters passed from config
granularity: Time bucket to aggregate data by hour, day, minute, etc.,
hours: Hours from now to skip training.
delay: Added delay to the fetch query from current time.
Expand All @@ -69,6 +87,11 @@ def build_params(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
)
if static_filters:
_LOGGER.debug("Static Filters are present!")
_static_filters = _make_static_filters(static_filters)
_filter = Filter(type="and", fields=[_static_filters, _filter])

reference_dt = reference_dt or datetime.now(pytz.utc)
end_dt = reference_dt - timedelta(hours=delay)
_LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay)
Expand Down Expand Up @@ -118,6 +141,7 @@ def fetch(
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
static_filters: Optional[FilterConf] = None,
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
Expand All @@ -135,6 +159,7 @@ def fetch(
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
static_filters: user defined filters
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
Expand All @@ -152,6 +177,7 @@ def fetch(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
static_filters=static_filters,
granularity=granularity,
hours=hours,
delay=delay,
Expand Down Expand Up @@ -193,6 +219,7 @@ def chunked_fetch(
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
static_filter: Optional[FilterConf] = None,
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
Expand All @@ -213,6 +240,7 @@ def chunked_fetch(
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
static_filter: user defined filters
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
Expand Down Expand Up @@ -245,6 +273,7 @@ def chunked_fetch(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
static_filter=static_filter,
granularity=granularity,
hours=min(chunked_hours, hours - hours_elapsed),
delay=delay,
Expand All @@ -259,9 +288,7 @@ def chunked_fetch(
_LOGGER.debug("Fetching data concurrently with %s threads", max_threads)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = [executor.submit(self._fetch, **params) for params in qparams]
for future in futures:
chunked_dfs.append(future.result())

chunked_dfs.extend(future.result() for future in futures)
df = pd.concat(chunked_dfs, axis=0, ignore_index=True)
df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

Expand Down
1 change: 1 addition & 0 deletions numalogic/udfs/trainer/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]:
datasource=_fetcher_conf.datasource,
filter_keys=_stream_conf.composite_keys,
filter_values=payload.composite_keys,
static_filters=_fetcher_conf.static_filters,
dimensions=list(_fetcher_conf.dimensions),
delay=self.dataconn_conf.delay_hrs,
granularity=_fetcher_conf.granularity,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic"
version = "0.9.1a7"
version = "0.9.1a8"
description = "Collection of operational Machine Learning models and tools."
authors = ["Numalogic Developers"]
packages = [{ include = "numalogic" }]
Expand Down

0 comments on commit 98c5766

Please sign in to comment.