Skip to content

Commit

Permalink
feat: initial support for flattened vector in backtest (#361)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Apr 10, 2024
1 parent 0cbaa52 commit 61f6598
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 31 deletions.
4 changes: 2 additions & 2 deletions numalogic/backtest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from importlib.util import find_spec
from numalogic.backtest._prom import PromBacktester
from numalogic.backtest._prom import PromBacktester, OutDataFrames


def _validate_req_pkgs():
Expand All @@ -12,4 +12,4 @@ def _validate_req_pkgs():
_validate_req_pkgs()


__all__ = ["PromBacktester"]
__all__ = ["PromBacktester", "OutDataFrames"]
106 changes: 81 additions & 25 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import logging
import os.path
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Union
Expand Down Expand Up @@ -40,6 +41,16 @@
LOGGER = logging.getLogger(__name__)


@dataclass
class OutDataFrames:
input: pd.DataFrame
preproc_out: pd.DataFrame
model_out: pd.DataFrame
thresh_out: pd.DataFrame
postproc_out: pd.DataFrame
unified_out: pd.DataFrame


class PromBacktester:
def __init__(
self,
Expand Down Expand Up @@ -71,6 +82,7 @@ def __init__(
self.metrics = metrics or []
self.conf: StreamConf = self._init_conf(metrics, numalogic_cfg, load_saved_conf)
self.nlconf: NumalogicConf = self.conf.get_numalogic_conf()
self.seq_len = self.conf.window_size

def _init_conf(self, metrics: list[str], nl_conf: dict, load_saved_conf: bool) -> StreamConf:
if load_saved_conf:
Expand Down Expand Up @@ -144,12 +156,44 @@ def train_models(
LOGGER.info("Models saved in %s", self._modelpath)
return artifacts_dict

def window_inverse(self, x: NDArray[float]) -> NDArray[float]:
"""
Perform inverse windowing on the given data.
If stride is 1, return the data as is.
If stride is 2, return the data by stacking the two windows.
Stride > 2 is not supported yet.
Args:
-------
x: Input data
Returns
-------
Inverse windowed data with feature recovery if stride is 2
"""
x = torch.from_numpy(x)
stride = self.nlconf.trainer.ds_stride

if stride == 1:
return inverse_window(x).numpy()

# TODO support for stride > 2
if stride > 2:
raise NotImplementedError("Stride > 2 not supported!")

# Recover the features
x1, x2 = x[:, ::stride], x[:, 1::stride]
x1 = inverse_window(x1).numpy()
x2 = inverse_window(x2).numpy()
return np.hstack([x1, x2])

def generate_scores(
self,
df: pd.DataFrame,
model_path: Optional[str] = None,
use_full_data: bool = False,
) -> pd.DataFrame:
) -> OutDataFrames:
"""
Generate scores for the given data.
Expand All @@ -161,7 +205,7 @@ def generate_scores(
Returns
-------
Dataframe with timestamp and metric values
Dict of dataframes with timestamp and metric values for each step
Raises
------
Expand All @@ -188,11 +232,13 @@ def generate_scores(
# Preprocess
x_scaled = preproc_udf.compute(model=artifacts["preproc_clf"], input_=x_test)

ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size)
n_feat = x_scaled.shape[1]

x_recon = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)
raw_scores = np.zeros((len(ds), self.conf.window_size, len(self.metrics)), dtype=np.float32)
feature_scores = np.zeros((len(ds), len(self.metrics)), dtype=np.float32)
ds = StreamingDataset(x_scaled, seq_len=self.seq_len, stride=self.nlconf.trainer.ds_stride)

x_recon = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
raw_scores = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
feature_scores = np.zeros((len(ds), n_feat), dtype=np.float32)
unified_scores = np.zeros((len(ds), 1), dtype=np.float32)

postproc_func = PostprocessFactory().get_instance(self.nlconf.postprocess)
Expand All @@ -215,16 +261,17 @@ def generate_scores(
feature_scores[idx], self.nlconf.score.feature_agg
)

x_recon = inverse_window(torch.from_numpy(x_recon), method="keep_first").numpy()
raw_scores = inverse_window(torch.from_numpy(raw_scores), method="keep_first").numpy()
x_recon = self.window_inverse(x_recon)
raw_scores = self.window_inverse(raw_scores)

feature_scores = np.vstack(
[
np.full((self.conf.window_size - 1, len(self.metrics)), fill_value=np.nan),
np.full((len(x_test) - len(ds), n_feat), fill_value=np.nan),
feature_scores,
]
)
unified_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), unified_scores]
[np.full((len(x_test) - len(ds), 1), fill_value=np.nan), unified_scores]
)

return self._construct_output(
Expand All @@ -244,7 +291,7 @@ def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame:
x_test = df[metrics].to_numpy(dtype=np.float32)

postproc_udf = UDFFactory.get_udf_cls("postprocess")
ds = StreamingDataset(x_test, seq_len=self.conf.window_size)
ds = StreamingDataset(x_test, seq_len=self.seq_len)

feature_scores = np.zeros((len(ds), len(metrics)), dtype=np.float32)
unified_scores = np.zeros((len(ds), 1), dtype=np.float32)
Expand All @@ -258,12 +305,12 @@ def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame:
)
feature_scores = np.vstack(
[
np.full((self.conf.window_size - 1, len(metrics)), fill_value=np.nan),
np.full((self.seq_len - 1, len(metrics)), fill_value=np.nan),
feature_scores,
]
)
unified_scores = np.vstack(
[np.full((self.conf.window_size - 1, 1), fill_value=np.nan), unified_scores]
[np.full((self.seq_len - 1, 1), fill_value=np.nan), unified_scores]
)
dfs = {
"input": df,
Expand Down Expand Up @@ -345,9 +392,13 @@ def _construct_output(
thresh_out: NDArray[float],
postproc_out: NDArray[float],
unified_out: NDArray[float],
) -> pd.DataFrame:
) -> OutDataFrames:
ts_idx = input_df.index

print(
preproc_out.shape, nn_out.shape, thresh_out.shape, postproc_out.shape, unified_out.shape
)

if thresh_out.shape[1] > 1:
thresh_df = pd.DataFrame(
thresh_out,
Expand All @@ -364,7 +415,6 @@ def _construct_output(
if postproc_out.shape[1] > 1:
postproc_df = pd.DataFrame(
postproc_out,
# columns=["unified_score"],
columns=self.metrics,
index=ts_idx,
)
Expand All @@ -375,24 +425,30 @@ def _construct_output(
index=ts_idx,
)

dfs = {
"input": input_df,
"preproc_out": pd.DataFrame(
if len(preproc_out) == len(ts_idx) and preproc_out.shape[1] == len(self.metrics):
preproc_df = pd.DataFrame(
preproc_out,
columns=self.metrics,
index=ts_idx,
),
"model_out": pd.DataFrame(
)
else:
preproc_df = pd.DataFrame(
preproc_out,
)

return OutDataFrames(
input=input_df,
preproc_out=preproc_df,
model_out=pd.DataFrame(
nn_out,
columns=self.metrics,
index=ts_idx,
),
"thresh_out": thresh_df,
"postproc_out": postproc_df,
"unified_out": pd.DataFrame(
thresh_out=thresh_df,
postproc_out=postproc_df,
unified_out=pd.DataFrame(
unified_out,
columns=["unified"],
index=ts_idx,
),
}
return pd.concat(dfs, axis=1)
)
3 changes: 3 additions & 0 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ def __init__(self, data: npt.NDArray[float], seq_len: int, stride: int = 1):
self._seq_len = seq_len
self._data = data.astype(np.float32)
self._stride = stride
_LOGGER.info(
"StreamingDataset initialized with seq_len: %s, stride: %s", self._seq_len, self._stride
)

@property
def data(self) -> npt.NDArray[float]:
Expand Down
10 changes: 6 additions & 4 deletions tests/test_backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from omegaconf import OmegaConf

from numalogic._constants import TESTS_DIR
from numalogic.backtest import PromBacktester
from numalogic.backtest import PromBacktester, OutDataFrames
from numalogic.config import (
NumalogicConf,
ModelInfo,
Expand Down Expand Up @@ -64,9 +64,11 @@ def test_train(backtester, read_data):


def test_scores(backtester, read_data):
out_df = backtester.generate_scores(read_data)
assert isinstance(out_df, pd.DataFrame)
assert out_df.shape[0] == int(backtester.test_ratio * read_data.shape[0])
out_dfs = backtester.generate_scores(read_data)
assert isinstance(out_dfs, OutDataFrames)
assert out_dfs.input.shape[0] == int(backtester.test_ratio * read_data.shape[0])
assert out_dfs.postproc_out.shape[0] == int(backtester.test_ratio * read_data.shape[0])
assert out_dfs.unified_out.shape[0] == int(backtester.test_ratio * read_data.shape[0])


def test_static_scores(backtester, read_data):
Expand Down

0 comments on commit 61f6598

Please sign in to comment.