Skip to content

Commit

Permalink
fix druid connector with tests (#296)
Browse files Browse the repository at this point in the history
- decouple method to increase testability
-  remove extraneous subquery

---------

Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: shrivardhan <[email protected]>
Co-authored-by: Shrivardhan Rao <[email protected]>
Co-authored-by: Avik Basu <[email protected]>
  • Loading branch information
3 people committed Sep 22, 2023
1 parent 98e376a commit d249942
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 110 deletions.
101 changes: 72 additions & 29 deletions numalogic/connectors/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,70 @@
_LOGGER = logging.getLogger(__name__)


# TODO: pass dictionary of keys and values as dict
def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[str, str]:
"""
Args:
filter_keys: keys
filter_values: values.
Returns: a dict of key value pairs
"""
return dict(zip(filter_keys, filter_values))


def build_params(
aggregations: list[str],
datasource: str,
dimensions: list[str],
filter_pairs: dict,
granularity: str,
hours: float,
) -> dict:
"""
Args:
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
datasource: Data source to query
dimensions: The dimensions to group by
filter_pairs: Indicates which rows of
data to include in the query
granularity: Time bucket to aggregate data by hour, day, minute, etc.,
hours: Hours from now to skip training.
Returns: a dict of parameters
"""
_filter = Filter(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
)
end_dt = datetime.now(pytz.utc)
start_dt = end_dt - timedelta(hours=hours)

intervals = [f"{start_dt.isoformat()}/{end_dt.isoformat()}"]
dimension_specs = map(lambda d: DimensionSpec(dimension=d, output_name=d), dimensions)

params = {
"datasource": datasource,
"granularity": granularity,
"intervals": intervals,
"aggregations": aggregations,
"filter": _filter,
"dimensions": dimension_specs,
}

_LOGGER.debug(
"Druid query params: %s",
params,
)

return params


class DruidFetcher(DataFetcher):
"""
Class for fetching data as a dataframe from Druid.
Expand All @@ -40,37 +104,13 @@ def fetch(
pivot: Optional[Pivot] = None,
hours: float = 24,
) -> pd.DataFrame:
_start_time = time.time()
filter_pairs = {}
for k, v in zip(filter_keys, filter_values):
filter_pairs[k] = v

_filter = Filter(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)
query_params = build_params(
aggregations, datasource, dimensions, filter_pairs, granularity, hours
)

end_dt = datetime.now(pytz.utc)
start_dt = end_dt - timedelta(hours=hours)
intervals = [f"{start_dt.isoformat()}/{end_dt.isoformat()}"]

dimension_specs = map(lambda d: DimensionSpec(dimension=d, output_name=d), dimensions)

params = {
"datasource": datasource,
"granularity": granularity,
"intervals": intervals,
"aggregations": aggregations,
"filter": _filter,
"dimensions": dimension_specs,
}

_LOGGER.debug(
"Fetching data with params: %s",
params,
)
self.client.sub_query(**params)
response = self.client.groupby(**params)
response = self.client.groupby(**query_params)
df = response.export_pandas()

if df is None or df.shape[0] == 0:
Expand All @@ -90,6 +130,9 @@ def fetch(
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("Druid query latency: %.6fs", _end_time)
return df

def raw_fetch(self, *args, **kwargs) -> pd.DataFrame:
Expand Down
Loading

0 comments on commit d249942

Please sign in to comment.