-
Notifications
You must be signed in to change notification settings - Fork 0
/
psi.py
82 lines (61 loc) · 2.9 KB
/
psi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np
def calculate_psi(expected, actual, buckettype='bins', buckets=10, axis=0):
'''Calculate the PSI (population stability index) across all variables
Args:
expected: numpy matrix of original values
actual: numpy matrix of new values, same size as expected
buckettype: type of strategy for creating buckets, bins splits into even splits, quantiles splits into quantile buckets
buckets: number of quantiles to use in bucketing variables
axis: axis by which variables are defined, 0 for vertical, 1 for horizontal
Returns:
psi_values: ndarray of psi values for each variable
Author:
Matthew Burke
github.com/mwburke
worksofchart.com
'''
def psi(expected_array, actual_array, buckets):
'''Calculate the PSI for a single variable
Args:
expected_array: numpy array of original values
actual_array: numpy array of new values, same size as expected
buckets: number of percentile ranges to bucket the values into
Returns:
psi_value: calculated PSI value
'''
def scale_range (input, min, max):
input += -(np.min(input))
input /= np.max(input) / (max - min)
input += min
return input
breakpoints = np.arange(0, buckets + 1) / (buckets) * 100
if buckettype == 'bins':
breakpoints = scale_range(breakpoints, np.min(expected_array), np.max(expected_array))
elif buckettype == 'quantiles':
breakpoints = np.stack([np.percentile(expected_array, b) for b in breakpoints])
expected_percents = np.histogram(expected_array, breakpoints)[0] / len(expected_array)
actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)
def sub_psi(e_perc, a_perc):
'''Calculate the actual PSI value from comparing the values.
Update the actual value to a very small number if equal to zero
'''
if a_perc == 0:
a_perc = 0.0001
if e_perc == 0:
e_perc = 0.0001
value = (e_perc - a_perc) * np.log(e_perc / a_perc)
return(value)
psi_value = np.sum(sub_psi(expected_percents[i], actual_percents[i]) for i in range(0, len(expected_percents)))
return(psi_value)
if len(expected.shape) == 1:
psi_values = np.empty(len(expected.shape))
else:
psi_values = np.empty(expected.shape[axis])
for i in range(0, len(psi_values)):
if len(psi_values) == 1:
psi_values = psi(expected, actual, buckets)
elif axis == 0:
psi_values[i] = psi(expected[:,i], actual[:,i], buckets)
elif axis == 1:
psi_values[i] = psi(expected[i,:], actual[i,:], buckets)
return(psi_values)