Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MC-CCR implementation #102

Merged
merged 22 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions examples/resampling/CCR.ipynb

Large diffs are not rendered by default.

227 changes: 227 additions & 0 deletions multi_imbalance/resampling/ccr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from collections import Counter
from typing import Tuple, Callable

import numpy as np
from imblearn.base import BaseSampler


class CCR(BaseSampler):
"""
CCR is a combined cleaning and resampling energy-based algorithm.

Each minority example has an associated energy budget that is used to expand a sphere around it. With each
majority example within the sphere, the cost of further expansion increases. When energy is used up,
majority examples are pushed out of the spheres and synthetic minority examples are generated inside the spheres.
Synthetic examples are generated until the count of minority examples is approximately equal to the count of
majority examples. Smaller spheres generate more synthetic examples than big ones to force the classification
algorithm to focus on the most difficult examples.

Reference:
Koziarski, M., Wozniak, M.: CCR: A combined cleaning and resampling algorithm for imbalanced data classification.
International Journal of Applied Mathematics and Computer Science 2017
"""

def __init__(self, energy: float, distance_function: Callable[[np.ndarray, np.ndarray], np.ndarray] =
lambda x, y: np.linalg.norm(x - y, ord=1, axis=1)) -> None:
"""
:param energy:
initial energy budget for each minority example to use for sphere expansion
:param distance_function:
function to calculate distance between minority example and array of majority examples, defaults to L1 norm
"""
super().__init__()
self.energy = energy
self._sampling_type = "over-sampling"
self.distance_function = distance_function

def _fit_resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X:
two-dimensional numpy array (number of samples x number of features) with float numbers
:param y:
one-dimensional numpy array with labels for rows in X
:return:
resampled X, resampled y
"""
minority_class = min(list(Counter(y).items()), key=lambda x: x[1])[0]

minority_examples = X[y == minority_class].copy()
majority_examples = X[y != minority_class].copy()

clean_majority, synthetic_minority = self._clean_and_generate(minority_examples, majority_examples)

return np.vstack([minority_examples, clean_majority, synthetic_minority]), np.hstack([
np.full((minority_examples.shape[0],), minority_class),
y[y != minority_class],
np.full((synthetic_minority.shape[0],), minority_class)
])

def _clean_and_generate(self, minority_examples: np.ndarray, majority_examples: np.ndarray,
synthetic_examples_total: int = None) -> Tuple[np.ndarray, np.ndarray]:
"""
:param minority_examples:
two-dimensional numpy array (number of samples x number of features) with float numbers of minority class
:param majority_examples:
two-dimensional numpy array (number of samples x number of features) with float numbers of majority class
:param synthetic_examples_total:
number of synthetic examples to be generated, if left as None it is calculated as difference of class counts
:return:
clean majority X, synthetic minority X
"""
r, t = self._calculate_radius_and_translations(minority_examples, majority_examples)
translated_majority_examples = majority_examples + t
synthetic_examples = self._generate_synthetic_examples(minority_examples, majority_examples, r,
synthetic_examples_total)

return translated_majority_examples, synthetic_examples

def _calculate_radius_and_translations(self, minority_examples, majority_examples):
radius = np.zeros(minority_examples.shape[0])
translations = np.zeros(majority_examples.shape)
majority_count = len(majority_examples)

for i, minority_example in enumerate(minority_examples):
distances = self.distance_function(minority_example, majority_examples)
sorted_distances_index = np.argsort(distances)
energy = self.energy
current_example = 0

while current_example < majority_count:
current_example_distance_index = sorted_distances_index[current_example]
current_example_distance = distances[current_example_distance_index]
if current_example_distance <= radius[i]:
current_example += 1
continue

dr = energy / (current_example + 1)
if radius[i] + dr >= current_example_distance:
dr = current_example_distance - radius[i]

radius[i] += dr
energy -= dr * (current_example + 1)
if energy <= 0:
break
current_example += 1

if energy > 0:
radius[i] += energy / current_example

for j in range(current_example):
d = distances[sorted_distances_index[j]]
if d == 0:
continue

majority_example_index = sorted_distances_index[j]
translation = majority_examples[majority_example_index] - minority_example
translations[majority_example_index] += (radius[i] - d) / d * translation

return radius, translations

def _generate_synthetic_examples(self, minority_examples, majority_examples, r, synthetic_examples_total):
generation_order = r.argsort()
if synthetic_examples_total is None:
synthetic_examples_total = majority_examples.shape[0] - minority_examples.shape[0]

synthetic_examples_counts = self._calculate_synthetic_count_per_minority(generation_order, r,
synthetic_examples_total)

generated = []
for i in generation_order:
x = minority_examples[i]
for j in range(synthetic_examples_counts[i]):
random_translation = np.random.rand(majority_examples.shape[1]) * 2 - 1
multiplier = random_translation / abs(random_translation).sum()
new_point = x + multiplier * r[i] * np.random.rand(1)
generated.append(new_point)

if len(generated) == synthetic_examples_total:
break
if len(generated) == synthetic_examples_total:
break

if len(generated) > 0:
generated = np.array(generated)
else:
generated = np.empty((0, minority_examples.shape[1]))

return generated

def _calculate_synthetic_count_per_minority(self, generation_order, r, synthetic_examples_total):
synthetic_examples_counts = (r ** -1 / (r ** -1).sum()) * synthetic_examples_total
synthetic_leftovers = round((synthetic_examples_counts - synthetic_examples_counts.astype(int)).sum())
synthetic_examples_counts = synthetic_examples_counts.astype(int)
for i in range(synthetic_leftovers):
synthetic_examples_counts[generation_order[i % len(generation_order)]] += 1
return synthetic_examples_counts


class MultiClassCCR(BaseSampler):
"""
CCR for multi-class problems.

The approach consists of the following steps:
1. The classes are sorted in the descending order by the number of associated observations.
2. For each of the minority classes, a collection of combined majority observations is constructed, consisting of
a randomly sampled fraction of observations from each of the already considered class.
3. Preprocessing with the CCR algorithm is performed, using the observations from the currently considered class
as a minority, and the combined majority observations as the majority class. Both the generated synthetic
minority observations and the applied translations are incorporated into the original data, and the synthetic
observations can be used to construct the collection of combined majority observations for later classes.

Koziarski, M., Wozniak, M., Krawczyk, B.: Combined Cleaning and Resampling Algorithm for Multi-Class Imbalanced
Data with Label Noise. (2020)
"""

def __init__(self, energy: float):
"""
:param energy:
initial energy budget for each minority example to use for sphere expansion
"""
super().__init__()
self._sampling_type = "over-sampling"
self.CCR = CCR(energy=energy)

def _fit_resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X:
two-dimensional numpy array (number of samples x number of features) with float numbers
:param y:
one-dimensional numpy array with labels for rows in X, assumes minority class = 1 and majority class = 0
:return:
resampled X, resampled y
"""
sorted_class_counts = sorted(list(Counter(y).items()), key=lambda x: x[1], reverse=True)
n_max = sorted_class_counts[0][1]
class_X = {clazz: X[y == clazz] for clazz, _ in sorted_class_counts}

for i in range(1, len(sorted_class_counts)):
current_class, current_class_count = sorted_class_counts[i]
number_of_classes_with_higher_count = sum([1 for _, count in sorted_class_counts[:i] if count > current_class_count])
if number_of_classes_with_higher_count > 0:
X_minority = class_X[current_class]
X_majority = []
class_samples = []
for clazz, _ in sorted_class_counts[:i]:
if clazz != current_class:
sampled_X = class_X[clazz]
sampled_size = sampled_X.shape[0]
sample_size = int(n_max / number_of_classes_with_higher_count)
sample_size = min(sample_size, sampled_size)
sample = np.random.choice(sampled_size, sample_size, replace=False)
class_samples.append((clazz, sample))
X_majority.append(sampled_X[sample])
X_majority = np.concatenate(X_majority)
clean_X_majority, synthetic_minority = self.CCR._clean_and_generate(X_minority, X_majority,
n_max - current_class_count)
class_X[current_class] = np.vstack([class_X[current_class], synthetic_minority])
clean_X_splits = [sample.shape[0] for _, sample in class_samples[:-1]]
for j in range(1, len(clean_X_splits)):
clean_X_splits[j] += clean_X_splits[j - 1]
split_clean_X = np.split(clean_X_majority, clean_X_splits)

for j, (clazz, sample) in enumerate(class_samples):
class_X[clazz][sample] = split_clean_X[j]

final_X = np.vstack([class_X[clazz] for clazz, _ in sorted_class_counts])
final_y = np.hstack([np.full((class_X[clazz].shape[0],), clazz) for clazz, _ in sorted_class_counts])
return final_X, final_y
124 changes: 124 additions & 0 deletions tests/resampling/test_ccr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from unittest.mock import patch

import numpy as np
from numpy.testing import assert_array_equal

from multi_imbalance.resampling.ccr import CCR, MultiClassCCR

X = np.array([
[0.51916715, 0.46894559],
[0.42850038, 0.49204451],
[0.45844347, 0.44231806],
[0.49862482, 0.61777354],
[0.55701822, 0.32693741],
[0.37040839, 0.48617894],
])

y = np.array([
1,
0,
0,
0,
1,
0
])

original_cleaning_results = np.array([
[0.3279181486665761, 0.5367090144999095],
[0.3123061206752467, 0.4686048130243289],
[0.49287499380029176, 0.6594306863002918],
[0.3248957760286293, 0.4914514685286293],
[0.51916715, 0.46894559],
[0.55701822, 0.32693741]
])

multiclass_X = np.vstack(
[
np.random.normal(0, 1, (100, 2)),
np.random.normal(3, 5, (30, 2)),
np.random.normal(-2, 2, (20, 2)),
np.random.normal(-4, 1, (10, 2)),
np.random.normal(10, 1, (5, 2)),
]
)

multiclass_y = np.array([1] * 100 + [2] * 30 + [3] * 20 + [4] * 10 + [5] * 5)


def test_compare_cleaning_results_to_original_article_implementation():
clf = CCR(energy=0.5)
resampled_X, resampled_y = clf.fit_resample(X, y)
assert_array_equal(np.sort(resampled_X[:X.shape[0]], axis=0), np.sort(original_cleaning_results, axis=0))


def test_radius_equal_to_energy_and_translations_equal_zero_when_majority_not_in_range():
clf = CCR(energy=0.5)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 1], [-1, -1]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([0.5]))
assert_array_equal(t, np.array([[0, 0], [0, 0]]))


def test_radius_decreases_and_translation_nonequal_zero_when_majority_in_range():
clf = CCR(energy=1)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[0.5, 0], [1, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([0.75]))
assert_array_equal(t, np.array([[0.25, 0], [0, 0]]))


def test_energy_cost_should_increase_proportionally_to_number_of_examples_in_radius():
clf = CCR(energy=10)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [2, 0], [3, 0], [4, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([4]))
assert_array_equal(t, np.array([[3, 0], [2, 0], [1, 0], [0, 0]]))


def test_should_use_leftover_energy_when_all_examples_in_radius():
clf = CCR(energy=110)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [2, 0], [3, 0], [4, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([29]))
assert_array_equal(t, np.array([[28, 0], [27, 0], [26, 0], [25, 0]]))


def test_translations_should_accumulate():
clf = CCR(energy=1)
minority_examples = np.array([[0, 0], [2, 0]])
majority_examples = np.array([[1, 0]])
_, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(t, np.array([[0, 0]]))


def test_should_properly_handle_same_distance_examples():
clf = CCR(energy=2)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [1, 0]])
r, _ = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([1.5]))


def test_multiclass_equal_class_counts():
clf = MultiClassCCR(energy=0.5)
resampled_X, resampled_y = clf.fit_resample(multiclass_X, multiclass_y)
assert np.unique(resampled_y, return_counts=True)[1].min() == np.unique(resampled_y, return_counts=True)[1].max()


def test_multiclass_ccr_call_count():
clf = MultiClassCCR(energy=0.5)

with patch.object(CCR, '_clean_and_generate', wraps=clf.CCR._clean_and_generate) as mock:
_, _ = clf.fit_resample(multiclass_X, multiclass_y)
print(mock.call_count)
assert mock.call_count == 4