Skip to content

Commit

Permalink
Merge pull request #102 from damian-horna/ccr-implementation
Browse files Browse the repository at this point in the history
MC-CCR implementation
  • Loading branch information
dddddddddtd committed Jun 21, 2023
2 parents 534a0c4 + 918bfc5 commit 6e04ed0
Show file tree
Hide file tree
Showing 3 changed files with 642 additions and 0 deletions.
291 changes: 291 additions & 0 deletions examples/resampling/CCR.ipynb

Large diffs are not rendered by default.

227 changes: 227 additions & 0 deletions multi_imbalance/resampling/ccr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from collections import Counter
from typing import Tuple, Callable

import numpy as np
from imblearn.base import BaseSampler


class CCR(BaseSampler):
"""
CCR is a combined cleaning and resampling energy-based algorithm.
Each minority example has an associated energy budget that is used to expand a sphere around it. With each
majority example within the sphere, the cost of further expansion increases. When energy is used up,
majority examples are pushed out of the spheres and synthetic minority examples are generated inside the spheres.
Synthetic examples are generated until the count of minority examples is approximately equal to the count of
majority examples. Smaller spheres generate more synthetic examples than big ones to force the classification
algorithm to focus on the most difficult examples.
Reference:
Koziarski, M., Wozniak, M.: CCR: A combined cleaning and resampling algorithm for imbalanced data classification.
International Journal of Applied Mathematics and Computer Science 2017
"""

def __init__(self, energy: float, distance_function: Callable[[np.ndarray, np.ndarray], np.ndarray] =
lambda x, y: np.linalg.norm(x - y, ord=1, axis=1)) -> None:
"""
:param energy:
initial energy budget for each minority example to use for sphere expansion
:param distance_function:
function to calculate distance between minority example and array of majority examples, defaults to L1 norm
"""
super().__init__()
self.energy = energy
self._sampling_type = "over-sampling"
self.distance_function = distance_function

def _fit_resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X:
two-dimensional numpy array (number of samples x number of features) with float numbers
:param y:
one-dimensional numpy array with labels for rows in X
:return:
resampled X, resampled y
"""
minority_class = min(list(Counter(y).items()), key=lambda x: x[1])[0]

minority_examples = X[y == minority_class].copy()
majority_examples = X[y != minority_class].copy()

clean_majority, synthetic_minority = self._clean_and_generate(minority_examples, majority_examples)

return np.vstack([minority_examples, clean_majority, synthetic_minority]), np.hstack([
np.full((minority_examples.shape[0],), minority_class),
y[y != minority_class],
np.full((synthetic_minority.shape[0],), minority_class)
])

def _clean_and_generate(self, minority_examples: np.ndarray, majority_examples: np.ndarray,
synthetic_examples_total: int = None) -> Tuple[np.ndarray, np.ndarray]:
"""
:param minority_examples:
two-dimensional numpy array (number of samples x number of features) with float numbers of minority class
:param majority_examples:
two-dimensional numpy array (number of samples x number of features) with float numbers of majority class
:param synthetic_examples_total:
number of synthetic examples to be generated, if left as None it is calculated as difference of class counts
:return:
clean majority X, synthetic minority X
"""
r, t = self._calculate_radius_and_translations(minority_examples, majority_examples)
translated_majority_examples = majority_examples + t
synthetic_examples = self._generate_synthetic_examples(minority_examples, majority_examples, r,
synthetic_examples_total)

return translated_majority_examples, synthetic_examples

def _calculate_radius_and_translations(self, minority_examples, majority_examples):
radius = np.zeros(minority_examples.shape[0])
translations = np.zeros(majority_examples.shape)
majority_count = len(majority_examples)

for i, minority_example in enumerate(minority_examples):
distances = self.distance_function(minority_example, majority_examples)
sorted_distances_index = np.argsort(distances)
energy = self.energy
current_example = 0

while current_example < majority_count:
current_example_distance_index = sorted_distances_index[current_example]
current_example_distance = distances[current_example_distance_index]
if current_example_distance <= radius[i]:
current_example += 1
continue

dr = energy / (current_example + 1)
if radius[i] + dr >= current_example_distance:
dr = current_example_distance - radius[i]

radius[i] += dr
energy -= dr * (current_example + 1)
if energy <= 0:
break
current_example += 1

if energy > 0:
radius[i] += energy / current_example

for j in range(current_example):
d = distances[sorted_distances_index[j]]
if d == 0:
continue

majority_example_index = sorted_distances_index[j]
translation = majority_examples[majority_example_index] - minority_example
translations[majority_example_index] += (radius[i] - d) / d * translation

return radius, translations

def _generate_synthetic_examples(self, minority_examples, majority_examples, r, synthetic_examples_total):
generation_order = r.argsort()
if synthetic_examples_total is None:
synthetic_examples_total = majority_examples.shape[0] - minority_examples.shape[0]

synthetic_examples_counts = self._calculate_synthetic_count_per_minority(generation_order, r,
synthetic_examples_total)

generated = []
for i in generation_order:
x = minority_examples[i]
for j in range(synthetic_examples_counts[i]):
random_translation = np.random.rand(majority_examples.shape[1]) * 2 - 1
multiplier = random_translation / abs(random_translation).sum()
new_point = x + multiplier * r[i] * np.random.rand(1)
generated.append(new_point)

if len(generated) == synthetic_examples_total:
break
if len(generated) == synthetic_examples_total:
break

if len(generated) > 0:
generated = np.array(generated)
else:
generated = np.empty((0, minority_examples.shape[1]))

return generated

def _calculate_synthetic_count_per_minority(self, generation_order, r, synthetic_examples_total):
synthetic_examples_counts = (r ** -1 / (r ** -1).sum()) * synthetic_examples_total
synthetic_leftovers = round((synthetic_examples_counts - synthetic_examples_counts.astype(int)).sum())
synthetic_examples_counts = synthetic_examples_counts.astype(int)
for i in range(synthetic_leftovers):
synthetic_examples_counts[generation_order[i % len(generation_order)]] += 1
return synthetic_examples_counts


class MultiClassCCR(BaseSampler):
"""
CCR for multi-class problems.
The approach consists of the following steps:
1. The classes are sorted in the descending order by the number of associated observations.
2. For each of the minority classes, a collection of combined majority observations is constructed, consisting of
a randomly sampled fraction of observations from each of the already considered class.
3. Preprocessing with the CCR algorithm is performed, using the observations from the currently considered class
as a minority, and the combined majority observations as the majority class. Both the generated synthetic
minority observations and the applied translations are incorporated into the original data, and the synthetic
observations can be used to construct the collection of combined majority observations for later classes.
Koziarski, M., Wozniak, M., Krawczyk, B.: Combined Cleaning and Resampling Algorithm for Multi-Class Imbalanced
Data with Label Noise. (2020)
"""

def __init__(self, energy: float):
"""
:param energy:
initial energy budget for each minority example to use for sphere expansion
"""
super().__init__()
self._sampling_type = "over-sampling"
self.CCR = CCR(energy=energy)

def _fit_resample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X:
two-dimensional numpy array (number of samples x number of features) with float numbers
:param y:
one-dimensional numpy array with labels for rows in X, assumes minority class = 1 and majority class = 0
:return:
resampled X, resampled y
"""
sorted_class_counts = sorted(list(Counter(y).items()), key=lambda x: x[1], reverse=True)
n_max = sorted_class_counts[0][1]
class_X = {clazz: X[y == clazz] for clazz, _ in sorted_class_counts}

for i in range(1, len(sorted_class_counts)):
current_class, current_class_count = sorted_class_counts[i]
number_of_classes_with_higher_count = sum([1 for _, count in sorted_class_counts[:i] if count > current_class_count])
if number_of_classes_with_higher_count > 0:
X_minority = class_X[current_class]
X_majority = []
class_samples = []
for clazz, _ in sorted_class_counts[:i]:
if clazz != current_class:
sampled_X = class_X[clazz]
sampled_size = sampled_X.shape[0]
sample_size = int(n_max / number_of_classes_with_higher_count)
sample_size = min(sample_size, sampled_size)
sample = np.random.choice(sampled_size, sample_size, replace=False)
class_samples.append((clazz, sample))
X_majority.append(sampled_X[sample])
X_majority = np.concatenate(X_majority)
clean_X_majority, synthetic_minority = self.CCR._clean_and_generate(X_minority, X_majority,
n_max - current_class_count)
class_X[current_class] = np.vstack([class_X[current_class], synthetic_minority])
clean_X_splits = [sample.shape[0] for _, sample in class_samples[:-1]]
for j in range(1, len(clean_X_splits)):
clean_X_splits[j] += clean_X_splits[j - 1]
split_clean_X = np.split(clean_X_majority, clean_X_splits)

for j, (clazz, sample) in enumerate(class_samples):
class_X[clazz][sample] = split_clean_X[j]

final_X = np.vstack([class_X[clazz] for clazz, _ in sorted_class_counts])
final_y = np.hstack([np.full((class_X[clazz].shape[0],), clazz) for clazz, _ in sorted_class_counts])
return final_X, final_y
124 changes: 124 additions & 0 deletions tests/resampling/test_ccr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from unittest.mock import patch

import numpy as np
from numpy.testing import assert_array_equal

from multi_imbalance.resampling.ccr import CCR, MultiClassCCR

X = np.array([
[0.51916715, 0.46894559],
[0.42850038, 0.49204451],
[0.45844347, 0.44231806],
[0.49862482, 0.61777354],
[0.55701822, 0.32693741],
[0.37040839, 0.48617894],
])

y = np.array([
1,
0,
0,
0,
1,
0
])

original_cleaning_results = np.array([
[0.3279181486665761, 0.5367090144999095],
[0.3123061206752467, 0.4686048130243289],
[0.49287499380029176, 0.6594306863002918],
[0.3248957760286293, 0.4914514685286293],
[0.51916715, 0.46894559],
[0.55701822, 0.32693741]
])

multiclass_X = np.vstack(
[
np.random.normal(0, 1, (100, 2)),
np.random.normal(3, 5, (30, 2)),
np.random.normal(-2, 2, (20, 2)),
np.random.normal(-4, 1, (10, 2)),
np.random.normal(10, 1, (5, 2)),
]
)

multiclass_y = np.array([1] * 100 + [2] * 30 + [3] * 20 + [4] * 10 + [5] * 5)


def test_compare_cleaning_results_to_original_article_implementation():
clf = CCR(energy=0.5)
resampled_X, resampled_y = clf.fit_resample(X, y)
assert_array_equal(np.sort(resampled_X[:X.shape[0]], axis=0), np.sort(original_cleaning_results, axis=0))


def test_radius_equal_to_energy_and_translations_equal_zero_when_majority_not_in_range():
clf = CCR(energy=0.5)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 1], [-1, -1]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([0.5]))
assert_array_equal(t, np.array([[0, 0], [0, 0]]))


def test_radius_decreases_and_translation_nonequal_zero_when_majority_in_range():
clf = CCR(energy=1)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[0.5, 0], [1, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([0.75]))
assert_array_equal(t, np.array([[0.25, 0], [0, 0]]))


def test_energy_cost_should_increase_proportionally_to_number_of_examples_in_radius():
clf = CCR(energy=10)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [2, 0], [3, 0], [4, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([4]))
assert_array_equal(t, np.array([[3, 0], [2, 0], [1, 0], [0, 0]]))


def test_should_use_leftover_energy_when_all_examples_in_radius():
clf = CCR(energy=110)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [2, 0], [3, 0], [4, 0]])
r, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([29]))
assert_array_equal(t, np.array([[28, 0], [27, 0], [26, 0], [25, 0]]))


def test_translations_should_accumulate():
clf = CCR(energy=1)
minority_examples = np.array([[0, 0], [2, 0]])
majority_examples = np.array([[1, 0]])
_, t = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(t, np.array([[0, 0]]))


def test_should_properly_handle_same_distance_examples():
clf = CCR(energy=2)
minority_examples = np.array([[0, 0]])
majority_examples = np.array([[1, 0], [1, 0]])
r, _ = clf._calculate_radius_and_translations(minority_examples, majority_examples)

assert_array_equal(r, np.array([1.5]))


def test_multiclass_equal_class_counts():
clf = MultiClassCCR(energy=0.5)
resampled_X, resampled_y = clf.fit_resample(multiclass_X, multiclass_y)
assert np.unique(resampled_y, return_counts=True)[1].min() == np.unique(resampled_y, return_counts=True)[1].max()


def test_multiclass_ccr_call_count():
clf = MultiClassCCR(energy=0.5)

with patch.object(CCR, '_clean_and_generate', wraps=clf.CCR._clean_and_generate) as mock:
_, _ = clf.fit_resample(multiclass_X, multiclass_y)
print(mock.call_count)
assert mock.call_count == 4

0 comments on commit 6e04ed0

Please sign in to comment.