-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
155 lines (111 loc) · 4.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import types
from itertools import groupby, chain
import numpy as np
from numpy import median
from numpy.random import randint, randn
def unions(sets):
if isinstance(sets, types.GeneratorType):
return set(chain(*list(sets)))
else:
return set(chain(*sets))
def group_by(xs, keyfunc):
"""A generator of tuples of a key and its group as a `list`"""
return ((k, list(g)) for k, g in groupby(sorted(xs, key=keyfunc), key=keyfunc))
def safe_iter(iterable):
"""Iterator (generator) that can skip removed items."""
copied = list(iterable)
for y in copied:
if y in iterable:
yield y
class between_sampler:
"""Random integer sampler which returns an `int` between given `min_inclusive` and `max_inclusive`."""
def __init__(self, min_inclusive, max_inclusive):
assert min_inclusive <= max_inclusive
self.m = min_inclusive
self.M = max_inclusive
def sample(self, size=None):
if size is None:
return randint(self.m, self.M + 1)
else:
return randint(self.m, self.M + 1, size=size)
class normal_sampler:
def __init__(self, mu=0.0, sd=1.0):
self.mu = mu
self.sd = sd
def sample(self):
return self.sd * randn() + self.mu
def average_agg(default=0.0):
"""A function that returns the average of a given input or `default` value if the given input is empty."""
return lambda items: (sum(items) / len(items)) if len(items) > 0 else default
def max_agg(default=0.0):
"""A function that returns the maximum value of a given input
or default value if the given input is empty.
"""
return lambda items: max(items) if len(items) > 0 else default
def linear_gaussian(parameters: dict, aggregator, error):
"""A linear model with an additive Gaussian noise.
Parameters
----------
parameters : parameters for the linear model. e.g., parameter = parameters[cause_rvar]
aggregator: a function that maps multiple values to a single value.
error: additive noise distribution, err = error.sample()
Returns
-------
function
a linear Gaussian model with given parameters and an aggregator
"""
def func(values, cause_item_attrs):
value = 0
for rvar in sorted(parameters.keys()):
item_attr_values = [values[item_attr] for item_attr in cause_item_attrs[rvar]]
value += parameters[rvar] * aggregator(item_attr_values)
return value + error.sample()
return func
def xors(parameters):
"""A function that xor-s given values."""
def func(values, cause_item_attrs):
value = 0 if parameters else randint(2)
for rvar in sorted(parameters.keys()):
for item_attr in cause_item_attrs[rvar]:
value ^= values[item_attr]
return value
return func
def median_except_diag(D, exclude_inf=True, default=1):
"""A median value of a matrix except diagonal elements.
Parameters
----------
D : matrix-like
exclude_inf : bool
whether to exclude infinity
default : int
default return value if there is no value to compute median
"""
return stat_except_diag(D, exclude_inf, default, median)
def mean_except_diag(D, exclude_inf=True, default=1):
"""A mean value of a matrix except diagonal elements.
Parameters
----------
D : array_like
Array to be measured
exclude_inf : bool
whether to exclude infinity
default : int
default return value if there is no value to compute mean
"""
return stat_except_diag(D, exclude_inf, default, np.mean)
def stat_except_diag(D, exclude_inf=True, default=1, func=median):
if D.ndim != 2:
raise TypeError('not a matrix')
if D.shape[0] != D.shape[1]:
raise TypeError('not a square matrix')
if len(D) <= 1:
raise ValueError('No non-diagonal element')
lower = D[np.tri(len(D), k=-1, dtype=bool)]
upper = D.transpose()[np.tri(len(D), k=-1, dtype=bool)]
non_diagonal = np.concatenate((lower, upper))
if exclude_inf:
non_diagonal = non_diagonal[non_diagonal != float('inf')]
if len(non_diagonal):
return func(non_diagonal)
else:
return default