-
Notifications
You must be signed in to change notification settings - Fork 129
/
utils.py
162 lines (145 loc) · 5.72 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Various utility functions for experiments.
Use G = nx.DiGraph(W) to convert adj matrix to directed graph.
Use W = nx.to_numpy_array(G) to convert directed graph to adj matrix.
"""
import numpy as np
import scipy.linalg as slin
import networkx as nx
def simulate_random_dag(d: int,
degree: int,
graph_type: str,
w_range: tuple = (0.5, 2.0)) -> nx.DiGraph:
"""Simulate random DAG with some expected degree.
Args:
d: number of nodes
degree: expected node degree, in + out
graph_type: {erdos-renyi, barabasi-albert, full}
w_range: weight range +/- (low, high)
Returns:
G: weighted DAG
"""
if graph_type == 'erdos-renyi':
prob = float(degree) / (d - 1)
B = np.tril((np.random.rand(d, d) < prob).astype(float), k=-1)
elif graph_type == 'barabasi-albert':
m = int(round(degree / 2))
B = np.zeros([d, d])
bag = [0]
for ii in range(1, d):
dest = np.random.choice(bag, size=m)
for jj in dest:
B[ii, jj] = 1
bag.append(ii)
bag.extend(dest)
elif graph_type == 'full': # ignore degree, only for experimental use
B = np.tril(np.ones([d, d]), k=-1)
else:
raise ValueError('unknown graph type')
# random permutation
P = np.random.permutation(np.eye(d, d)) # permutes first axis only
B_perm = P.T.dot(B).dot(P)
U = np.random.uniform(low=w_range[0], high=w_range[1], size=[d, d])
U[np.random.rand(d, d) < 0.5] *= -1
W = (B_perm != 0).astype(float) * U
G = nx.DiGraph(W)
return G
def simulate_sem(G: nx.DiGraph,
n: int,
sem_type: str,
noise_scale: float = 1.0) -> np.ndarray:
"""Simulate samples from SEM with specified type of noise.
Args:
G: weigthed DAG
n: number of samples
sem_type: {linear-gauss,linear-exp,linear-gumbel}
noise_scale: scale parameter of noise distribution in linear SEM
Returns:
X: [n,d] sample matrix
"""
W = nx.to_numpy_array(G)
d = W.shape[0]
X = np.zeros([n, d])
ordered_vertices = list(nx.topological_sort(G))
assert len(ordered_vertices) == d
for j in ordered_vertices:
parents = list(G.predecessors(j))
eta = X[:, parents].dot(W[parents, j]) # [n,]
if sem_type == 'linear-gauss':
X[:, j] = eta + np.random.normal(scale=noise_scale, size=n)
elif sem_type == 'linear-exp':
X[:, j] = eta + np.random.exponential(scale=noise_scale, size=n)
elif sem_type == 'linear-gumbel':
X[:, j] = eta + np.random.gumbel(scale=noise_scale, size=n)
else:
raise ValueError('unknown sem type')
return X
def simulate_population_sample(W: np.ndarray,
Omega: np.ndarray) -> np.ndarray:
"""Simulate data matrix X that matches population least squares.
Args:
W: [d,d] adjacency matrix
Omega: [d,d] noise covariance matrix
Returns:
X: [d,d] sample matrix
"""
d = W.shape[0]
X = np.sqrt(d) * slin.sqrtm(Omega).dot(np.linalg.pinv(np.eye(d) - W))
return X
def count_accuracy(G_true: nx.DiGraph,
G: nx.DiGraph,
G_und: nx.DiGraph = None) -> tuple:
"""Compute FDR, TPR, and FPR for B, or optionally for CPDAG B + B_und.
Args:
G_true: ground truth graph
G: predicted graph
G_und: predicted undirected edges in CPDAG, asymmetric
Returns:
fdr: (reverse + false positive) / prediction positive
tpr: (true positive) / condition positive
fpr: (reverse + false positive) / condition negative
shd: undirected extra + undirected missing + reverse
nnz: prediction positive
"""
B_true = nx.to_numpy_array(G_true) != 0
B = nx.to_numpy_array(G) != 0
B_und = None if G_und is None else nx.to_numpy_array(G_und)
d = B.shape[0]
# linear index of nonzeros
if B_und is not None:
pred_und = np.flatnonzero(B_und)
pred = np.flatnonzero(B)
cond = np.flatnonzero(B_true)
cond_reversed = np.flatnonzero(B_true.T)
cond_skeleton = np.concatenate([cond, cond_reversed])
# true pos
true_pos = np.intersect1d(pred, cond, assume_unique=True)
if B_und is not None:
# treat undirected edge favorably
true_pos_und = np.intersect1d(pred_und, cond_skeleton, assume_unique=True)
true_pos = np.concatenate([true_pos, true_pos_und])
# false pos
false_pos = np.setdiff1d(pred, cond_skeleton, assume_unique=True)
if B_und is not None:
false_pos_und = np.setdiff1d(pred_und, cond_skeleton, assume_unique=True)
false_pos = np.concatenate([false_pos, false_pos_und])
# reverse
extra = np.setdiff1d(pred, cond, assume_unique=True)
reverse = np.intersect1d(extra, cond_reversed, assume_unique=True)
# compute ratio
pred_size = len(pred)
if B_und is not None:
pred_size += len(pred_und)
cond_neg_size = 0.5 * d * (d - 1) - len(cond)
fdr = float(len(reverse) + len(false_pos)) / max(pred_size, 1)
tpr = float(len(true_pos)) / max(len(cond), 1)
fpr = float(len(reverse) + len(false_pos)) / max(cond_neg_size, 1)
# structural hamming distance
B_lower = np.tril(B + B.T)
if B_und is not None:
B_lower += np.tril(B_und + B_und.T)
pred_lower = np.flatnonzero(B_lower)
cond_lower = np.flatnonzero(np.tril(B_true + B_true.T))
extra_lower = np.setdiff1d(pred_lower, cond_lower, assume_unique=True)
missing_lower = np.setdiff1d(cond_lower, pred_lower, assume_unique=True)
shd = len(extra_lower) + len(missing_lower) + len(reverse)
return fdr, tpr, fpr, shd, pred_size