Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Gmm sampler #98

Merged
merged 9 commits into from
May 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change typing
  • Loading branch information
mateusz-wozny committed Mar 15, 2023
commit 6e70b958bfeed4f000f7e2fd0137dccef7b94156
148 changes: 69 additions & 79 deletions multi_imbalance/resampling/gmm_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from copy import deepcopy
from typing import Optional, List, Dict, Tuple, Any, TypeVar

import logging
import numpy as np
from imblearn.over_sampling.base import BaseSampler
from imblearn.utils import Substitution
Expand All @@ -14,6 +15,12 @@

GMMS = TypeVar("GMMS", bound="GMMSampler")

logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(name)s %(message)s",
datefmt="%d.%m.%Y %H:%M:%S",
)


@Substitution(
random_state=_random_state_docstring,
Expand Down Expand Up @@ -161,51 +168,50 @@ def __init__(
random_state: Optional[int] = None,
):
super().__init__(sampling_strategy="auto")
self.likelihood_threshold: float = likelihood_threshold
self.k_neighbors: int = k_neighbors
self.undersample: bool = undersample
self.min_components: int = min_components
self.max_components: Optional[int] = max_components
self._minority_classes: Optional[List[int]] = minority_classes
self.valid_size: float = valid_size
self.filter_new: float = filter_new
self.add_after_filtration: bool = add_after_filtration
self.iterations_after_filtration: int = iterations_after_filtration
self.n_init: int = n_init
self.tol: float = tol
self.max_iter: int = max_iter
self.random_state: Optional[int] = random_state
self.likelihood_threshold = likelihood_threshold
self.k_neighbors = k_neighbors
self.undersample = undersample
self.min_components = min_components
self.max_components = max_components
self._minority_classes = minority_classes
self.valid_size = valid_size
self.filter_new = filter_new
self.add_after_filtration = add_after_filtration
self.iterations_after_filtration = iterations_after_filtration
self.n_init = n_init
self.tol = tol
self.max_iter = max_iter
self.random_state = random_state
np.random.seed(self.random_state)

assert strategy in ["average", "median"], f"strategy '{strategy}' is invalid."
self.strategy: str = strategy
self.strategy = strategy

assert covariance_type in [
"full",
"tied",
"diag",
"spherical",
], f"covariance_type '{covariance_type}' is invalid."
self.covariance_type: str = covariance_type

self.likelihoods: Dict = dict()
self.gaussian_mixtures: Dict = dict()
self.class_sizes: Optional[Counter] = None
self.neighborhood: Optional[Dict] = None
self.maj_int_min: OrderedDict = OrderedDict({"maj": list(), "int": list(), "min": list()})
self.size_to_align: Optional[np.ndarray] = None
self.covariance_type = covariance_type

self.likelihoods: Dict[int, float] = dict()
self.gaussian_mixtures: Dict[int, GaussianMixture] = dict()
self.class_sizes: Optional[Counter[int]] = None
self.neighborhood: Optional[Dict[int, float]] = None
self.maj_int_min = OrderedDict({"maj": list(), "int": list(), "min": list()})
self.size_to_align: Optional[int] = None
self.__x_subset: Optional[np.ndarray] = None
self.cdist_min_count: int = 10
self.cdist_min_count = 10
self.__logger = logging.getLogger("GMMSampler")

@property
def minority_classes(self) -> List:
def minority_classes(self) -> List[int]:
if (self.class_sizes is None) or (self._minority_classes is not None):
return self._minority_classes
return self.maj_int_min["min"]

def _fit_resample(self, X: Any, y: Any) -> Tuple[np.ndarray, np.ndarray]:
X_resample: np.ndarray
y_resample: np.ndarray
X_resample, y_resample = self._to_numpy(X, y)

X_resample, y_resample = self._fit(X_resample, y_resample)._resample(X_resample, y_resample)
Expand All @@ -215,7 +221,7 @@ def _fit_resample(self, X: Any, y: Any) -> Tuple[np.ndarray, np.ndarray]:
return X_resample[indices], y_resample[indices]

def _fit(self, X: Any, y: Any) -> GMMS:
self.class_sizes: Counter = Counter(y)
self.class_sizes = Counter(y)
self._construct_neighborhood(X, y)

self._construct_maj_int_min()
Expand All @@ -227,13 +233,9 @@ def _fit(self, X: Any, y: Any) -> GMMS:

@staticmethod
def _to_numpy(X: Any, y: Any) -> Tuple[np.ndarray, np.ndarray]:
try:
return np.array(X).copy(), np.array(y).copy()
except Exception as e:
raise e
return np.array(X).copy(), np.array(y).copy()

def _fit_each_minority_class(self, X: np.ndarray, y: np.ndarray) -> None:
minority_class: int
for minority_class in self.minority_classes:
self._fit_single_class(X, y, minority_class)

Expand All @@ -243,10 +245,10 @@ def _fit_single_class(self, X: np.ndarray, y: np.ndarray, minority_class: int) -
valid: np.ndarray
train, valid = train_test_split(self.__x_subset, test_size=self.valid_size, random_state=self.random_state)

current_component_count: int = self.min_components
current_component_count = self.min_components

gaussian_mixture_model: GaussianMixture = self._init_model(current_component_count)
gaussian_mixture_model_temp: Optional[GaussianMixture] = None
gaussian_mixture_model = self._init_model(current_component_count)
gaussian_mixture_model_temp = None
gaussian_mixture_model.fit(train)

likelihood = [float("-inf"), gaussian_mixture_model.score(valid)]
Expand All @@ -269,16 +271,13 @@ def _construct_neighborhood(self, X: np.ndarray, y: np.ndarray) -> None:
neigh_clf: NearestNeighbors = NearestNeighbors(n_neighbors=self.k_neighbors + 1).fit(X)
nearest_neighbor_idxs: np.ndarray = neigh_clf.kneighbors(X, return_distance=False)[:, 1:]
self.neighborhood = dict()
sample_idx: int
neigh_samples: np.ndarray
for sample_idx, neigh_samples in enumerate(nearest_neighbor_idxs):
neigh_counts: Counter = Counter(y[neigh_samples])
self.neighborhood[sample_idx] = self._check_sample_neighborhood(y[sample_idx], neigh_counts)

def _check_sample_neighborhood(self, sample_class: int, neigh_counts: Counter) -> float:
neighborhood: float = 0.0
neigh_class: int
count: int
def _check_sample_neighborhood(self, sample_class: int, neigh_counts: Counter[int]) -> float:
neighborhood = 0.0
for neigh_class, count in neigh_counts.items():
class_sizes: List = [
self.class_sizes[sample_class],
Expand All @@ -294,19 +293,16 @@ def _construct_maj_int_min(self) -> None:
middle_size = self._get_middle_size_based_on_strategy()
self._fill_maj_int_min(middle_size)

def _get_middle_size_based_on_strategy(self) -> np.ndarray:
middle_size: np.ndarray
def _get_middle_size_based_on_strategy(self) -> int:
if self.strategy == "median":
middle_size = np.median(list(self.class_sizes.values()))
middle_size = int(np.median(list(self.class_sizes.values())))
elif self.strategy == "average":
middle_size = np.mean(list(self.class_sizes.values()))
middle_size = np.mean(list(self.class_sizes.values()), dtype=int)
else:
raise ValueError(f'Unrecognized {self.strategy}. Only "median" and "average" are allowed.')
return middle_size

def _fill_maj_int_min(self, middle_size) -> None:
class_label: int
class_size: int
def _fill_maj_int_min(self, middle_size: int) -> None:
for class_label, class_size in self.class_sizes.items():
if class_size == middle_size:
class_group = "int"
Expand All @@ -318,9 +314,9 @@ def _fill_maj_int_min(self, middle_size) -> None:
self.maj_int_min[class_group].append(class_label)

def _set_size_to_align(self) -> None:
maj_q: List = [self.class_sizes[k] for k in self.maj_int_min["maj"]]
min_q: List = [self.class_sizes[k] for k in self.maj_int_min["min"]]
int_q: List = [self.class_sizes[k] for k in self.maj_int_min["int"]]
maj_q = [self.class_sizes[k] for k in self.maj_int_min["maj"]]
min_q = [self.class_sizes[k] for k in self.maj_int_min["min"]]
int_q = [self.class_sizes[k] for k in self.maj_int_min["int"]]

if len(maj_q) == 0 and len(min_q) > 0:
self.size_to_align = np.mean(min_q, dtype=int)
Expand All @@ -345,9 +341,9 @@ def _init_model(self, n_components: int) -> GaussianMixture:
)

def _perform_step(self, n_components: int, likelihood: float, num_samples: int) -> bool:
likelihood_condition: bool = likelihood >= self.likelihood_threshold
max_components_condition: bool = self.max_components is None or n_components <= self.max_components
num_samples_condition: bool = n_components < num_samples
likelihood_condition = likelihood >= self.likelihood_threshold
max_components_condition = self.max_components is None or n_components <= self.max_components
num_samples_condition = n_components < num_samples
return likelihood_condition and max_components_condition and num_samples_condition

def _resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
Expand All @@ -357,34 +353,30 @@ def _resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarra
return X, y

def _oversample_each_minority_class(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
X_copy: np.ndarray = X.copy()
y_copy: np.ndarray = y.copy()
minority_class: int
X_copy = X.copy()
y_copy = y.copy()
for minority_class in self.minority_classes:
self.__x_subset = X_copy[y_copy == minority_class]
X, y = self._oversample(X_copy, y_copy, minority_class)
self.__x_subset = None
return X, y

def _oversample(self, X: np.ndarray, y: np.ndarray, minority_class: int) -> Tuple[np.ndarray, np.ndarray]:
means: np.ndarray
covariances: np.ndarray
means, covariances = self._get_coefficients(self.gaussian_mixtures[minority_class])

probabilities = self._get_probas_for_samples_in_component(X, y, minority_class)

quantity_to_generate: int = self.size_to_align - self.__x_subset.shape[0]
quantity_to_generate = self.size_to_align - self.__x_subset.shape[0]
for component in range(self.gaussian_mixtures[minority_class].n_components):
Nk: np.ndarray = probabilities[component] * quantity_to_generate
x: np.ndarray = self._create_samples(means[component], covariances[component], int(Nk))
x = self._create_samples(means[component], covariances[component], int(Nk))
X = np.append(X, x, axis=0)
y = np.append(y, np.full((x.shape[0],), fill_value=minority_class), axis=0)

return X, y

def _get_probas_for_samples_in_component(self, X: np.ndarray, y: np.ndarray, minority_class: int) -> np.ndarray:
X_prob: np.ndarray = self.gaussian_mixtures[minority_class].predict_proba(X[y == minority_class])
ratios: np.ndarray = np.array([v for k, v in self.neighborhood.items() if y[k] == minority_class])
ratios = np.array([v for k, v in self.neighborhood.items() if y[k] == minority_class])
ratios = ratios[..., np.newaxis]
probabilities: np.ndarray = np.sum((1.0 - ratios) * X_prob, axis=0) + 1e-8
probabilities = probabilities / np.sum(probabilities, keepdims=True)
Expand All @@ -409,19 +401,19 @@ def _get_coefficients(self, gaussian_mixture: GaussianMixture) -> Tuple[np.ndarr
return means, covariances

def _create_samples(self, mean: np.ndarray, covariance: np.ndarray, target_size: int) -> np.ndarray:
result: np.ndarray = np.empty((0, self.__x_subset.shape[1]), float)
iterations: int = 0
threshold_dist: float = self.filter_new
result = np.empty((0, self.__x_subset.shape[1]), float)
iterations = 0
threshold_dist = self.filter_new
while (result.shape[0] != target_size) and (iterations < self.iterations_after_filtration):
iterations += 1
size: int = max(target_size - result.shape[0], result.shape[1] + 1)
x: np.ndarray = np.random.multivariate_normal(mean, covariance, size=size)
size = max(target_size - result.shape[0], result.shape[1] + 1)
x = np.random.multivariate_normal(mean, covariance, size=size)
if self.filter_new == -1.0:
result = np.append(result, x, axis=0)
break
elif self.filter_new == 0.0:
mdist: np.ndarray = self._compute_mdist(self.__x_subset, mean, covariance)
threshold_dist: float = float(np.mean(mdist))
mdist = self._compute_mdist(self.__x_subset, mean, covariance)
threshold_dist = float(np.mean(mdist))

mdist = self._compute_mdist(x, mean, covariance)[: x.shape[0]]
x = x[mdist < threshold_dist]
Expand All @@ -432,29 +424,27 @@ def _create_samples(self, mean: np.ndarray, covariance: np.ndarray, target_size:
return result

def _compute_mdist(self, in_data: np.ndarray, mean: np.ndarray, covariance: np.ndarray) -> np.ndarray:
mdist: np.ndarray
try:
data: np.ndarray = in_data
data = in_data
if data.shape[0] < self.cdist_min_count:
data: np.ndarray = np.concatenate((in_data, in_data), axis=0)
data = np.concatenate((in_data, in_data), axis=0)
mdist = cdist(data, [mean], metric="mahalanobis", VI=np.linalg.inv(covariance))[:, 0]
except Exception as e:
print(f"Can't compute 'cdist' function. Distance threshold is set to 2.0")
print(f"For more information, examine exception: {e}")
self.__logger.error("Can't compute 'cdist' function. Distance threshold is set to 2.0")
self.__logger.info(f"For more information, examine exception: {e}")
mdist = np.full_like(in_data, fill_value=2.0)[:, 0]
return mdist

def _undersample_majority_classes(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
maj_class: int
for maj_class in self.maj_int_min["maj"]:
X, y = self._undersample(X, y, maj_class)
return X, y

def _undersample(self, X: np.ndarray, y: np.ndarray, class_id: int) -> Tuple[np.ndarray, np.ndarray]:
class_idxs: np.ndarray = np.where(y == class_id)[0]
class_idxs = np.where(y == class_id)[0]
sorted_neigh = sorted(self.neighborhood.items(), key=lambda item: item[1])
class_idxs: List = [k for k, v in sorted_neigh if k in class_idxs]
size: int = max(0, int(self.class_sizes[class_id] - self.size_to_align))
class_idxs = [k for k, _ in sorted_neigh if k in class_idxs]
size = max(0, int(self.class_sizes[class_id] - self.size_to_align))
X = np.delete(X, class_idxs[:size], axis=0)
y = np.delete(y, class_idxs[:size], axis=0)

Expand Down