-
-
Notifications
You must be signed in to change notification settings - Fork 84
/
generating.py
344 lines (301 loc) · 11.9 KB
/
generating.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""
Utilities for random data generating.
"""
# Created by Wenjie Du <[email protected]>
# License: GLP-v3
import math
from typing import Optional, Tuple
import numpy as np
from sklearn.utils import check_random_state
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pypots.data.utils import mcar, masked_fill
from pypots.data.load_specific_datasets import load_specific_dataset
def gene_complete_random_walk(
n_samples: int = 1000,
n_steps: int = 24,
n_features: int = 10,
mu: float = 0.0,
std: float = 1.0,
random_state: Optional[int] = None,
) -> np.ndarray:
"""Generate complete random walk time-series data.
Parameters
----------
n_samples : int, default=1000
The number of training time-series samples to generate.
n_steps: int, default=24
The number of time steps (length) of generated time-series samples.
n_features : int, default=10
The number of features (dimensions) of generated time-series samples.
mu : float, default 0.0,
Mean of the normal distribution, which random walk steps are sampled from.
std : float, default 1.,
Standard deviation of the normal distribution, which random walk steps are sampled from.
random_state : int or numpy.RandomState, default=None,
Random seed for data generation.
Returns
-------
array, shape of [n_samples, n_steps, n_features]
Generated random walk time series.
"""
seed = check_random_state(random_state)
ts_samples = np.zeros([n_samples, n_steps, n_features])
random_values = seed.randn(n_samples, n_steps, n_features) * std + mu
ts_samples[:, 0, :] = random_values[:, 0, :]
for t in range(1, n_steps):
ts_samples[:, t, :] = ts_samples[:, t - 1, :] + random_values[:, t, :]
ts_samples = np.asarray(ts_samples)
return ts_samples
def gene_random_walk_for_classification(
n_classes: int = 2,
n_samples_each_class: int = 500,
n_steps: int = 24,
n_features: int = 10,
shuffle: bool = True,
random_state: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate complete random walk time-series data for the classification task.
Parameters
----------
n_classes : int, default=2
Number of classes (types) of the generated data.
n_samples_each_class : int, default=500
Number of samples for each class to generate.
n_steps : int, default=24
Number of time steps in each sample.
n_features : int, default=10
Number of features.
shuffle : bool, default=True
Whether to shuffle generated samples.
If not, you can separate samples of each class according to `n_samples_each_class`.
For example,
X_class0=X[:n_samples_each_class],
X_class1=X[n_samples_each_class:n_samples_each_class*2]
random_state : int or numpy.RandomState, default=None,
Random seed for data generation.
Returns
-------
X : array, shape of [n_classes*n_samples_each_class, n_steps, n_features]
Generated time-series data.
y : array, shape of [n_classes*n_samples_each_class]
Labels indicating classes of time-series samples.
"""
ts_collector = []
label_collector = []
mu = 0
std = 1
for c_ in range(n_classes):
ts_samples = gene_complete_random_walk(
n_samples_each_class, n_steps, n_features, mu, std, random_state
)
label_samples = np.asarray([1 for _ in range(n_samples_each_class)]) * c_
ts_collector.extend(ts_samples)
label_collector.extend(label_samples)
mu += 1
X = np.asarray(ts_collector)
y = np.asarray(label_collector)
# if shuffling, then shuffle the order of samples
if shuffle:
indices = np.arange(len(X))
np.random.shuffle(indices)
X = X[indices]
y = y[indices]
return X, y
def gene_complete_random_walk_for_anomaly_detection(
n_samples: int = 1000,
n_steps: int = 24,
n_features: int = 10,
mu: float = 0.0,
std: float = 1.0,
anomaly_proportion: float = 0.1,
anomaly_fraction: float = 0.02,
anomaly_scale_factor: float = 2.0,
random_state: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate random walk time-series data for the anomaly-detection task.
Parameters
----------
n_samples : int, default=1000
The number of training time-series samples to generate.
n_features : int, default=10
The number of features (dimensions) of generated time-series samples.
n_steps: int, default=24
The number of time steps (length) of generated time-series samples.
mu : float, default 0.0,
Mean of the normal distribution, which random walk steps are sampled from.
std : float, default 1.,
Standard deviation of the normal distribution, which random walk steps are sampled from.
anomaly_proportion : float, in (0,1)
Proportion of anomaly samples in all samples.
anomaly_fraction : float, in (0,1)
Fraction of anomaly points in each anomaly sample.
anomaly_scale_factor : int or float,
Scale factor for value scaling to create anomaly points in time series samples.
random_state : int or numpy.RandomState, default=None,
Random seed for data generation.
Returns
-------
X : array, shape of [n_classes*n_samples_each_class, n_steps, n_features]
Generated time-series data.
y : array, shape of [n_classes*n_samples_each_class]
Labels indicating if time-series samples are anomalies.
"""
assert (
0 < anomaly_proportion < 1
), f"anomaly_proportion should be >0 and <1, but got {anomaly_proportion}"
assert (
0 < anomaly_fraction < 1
), f"anomaly_fraction should be >0 and <1, but got {anomaly_fraction}"
seed = check_random_state(random_state)
X = seed.randn(n_samples, n_steps, n_features) * std + mu
n_anomaly = math.floor(n_samples * anomaly_proportion)
anomaly_indices = np.random.choice(n_samples, size=n_anomaly, replace=False)
for a_i in anomaly_indices:
anomaly_sample = X[a_i]
anomaly_sample = anomaly_sample.flatten()
min_val = anomaly_sample.min()
max_val = anomaly_sample.max()
max_difference = min_val - max_val
n_points = n_steps * n_features
n_anomaly_points = int(n_points * anomaly_fraction)
point_indices = np.random.choice(
a=n_points, size=n_anomaly_points, replace=False
)
for p_i in point_indices:
anomaly_sample[p_i] = mu + np.random.uniform(
low=min_val - anomaly_scale_factor * max_difference,
high=max_val + anomaly_scale_factor * max_difference,
)
X[a_i] = anomaly_sample.reshape(n_steps, n_features)
# create labels
y = np.zeros(n_samples)
y[anomaly_indices] = 1
# shuffling
indices = np.arange(n_samples)
np.random.shuffle(indices)
X = X[indices]
y = y[indices]
return X, y
def gene_incomplete_random_walk_dataset(
n_steps=24, n_features=10, n_classes=2, n_samples_each_class=1000
):
"""Generate a random-walk dataset."""
# generate samples
X, y = gene_random_walk_for_classification(
n_classes=n_classes,
n_samples_each_class=n_samples_each_class,
n_steps=n_steps,
n_features=n_features,
)
# split into train/val/test sets
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.2)
train_X, val_X, train_y, val_y = train_test_split(train_X, train_y, test_size=0.2)
# create random missing values
_, train_X, missing_mask, _ = mcar(train_X, 0.3)
train_X = masked_fill(train_X, 1 - missing_mask, torch.nan)
_, val_X, missing_mask, _ = mcar(val_X, 0.3)
val_X = masked_fill(val_X, 1 - missing_mask, torch.nan)
# test set is left to mask after normalization
train_X = train_X.reshape(-1, n_features)
val_X = val_X.reshape(-1, n_features)
test_X = test_X.reshape(-1, n_features)
# normalization
scaler = StandardScaler()
train_X = scaler.fit_transform(train_X)
val_X = scaler.transform(val_X)
test_X = scaler.transform(test_X)
# reshape into time series samples
train_X = train_X.reshape(-1, n_steps, n_features)
val_X = val_X.reshape(-1, n_steps, n_features)
test_X = test_X.reshape(-1, n_steps, n_features)
# mask values in the validation set as ground truth
val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar(val_X, 0.3)
val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan)
# mask values in the test set as ground truth
test_X_intact, test_X, test_X_missing_mask, test_X_indicating_mask = mcar(
test_X, 0.3
)
test_X = masked_fill(test_X, 1 - test_X_missing_mask, torch.nan)
data = {
"n_classes": n_classes,
"n_steps": n_steps,
"n_features": n_features,
"train_X": train_X,
"train_y": train_y,
"val_X": val_X,
"val_y": val_y,
"val_X_intact": val_X_intact,
"val_X_indicating_mask": val_X_indicating_mask,
"test_X": test_X,
"test_y": test_y,
"test_X_intact": test_X_intact,
"test_X_indicating_mask": test_X_indicating_mask,
}
return data
def gene_physionet2012(artificially_missing: bool = True):
"""Generate a full-prepared PhysioNet-2012 dataset for model testing.
Parameters
----------
artificially_missing : bool, default = True,
Whether to artificially mask out 10% observed values and hold out for imputation performance evaluation.
"""
# generate samples
df = load_specific_dataset("physionet_2012")
X = df["X"]
y = df["y"]
all_recordID = X["RecordID"].unique()
train_set_ids, test_set_ids = train_test_split(all_recordID, test_size=0.2)
train_set_ids, val_set_ids = train_test_split(train_set_ids, test_size=0.2)
train_set = X[X["RecordID"].isin(train_set_ids)]
val_set = X[X["RecordID"].isin(val_set_ids)]
test_set = X[X["RecordID"].isin(test_set_ids)]
train_set = train_set.drop("RecordID", axis=1)
val_set = val_set.drop("RecordID", axis=1)
test_set = test_set.drop("RecordID", axis=1)
train_X, val_X, test_X = (
train_set.to_numpy(),
val_set.to_numpy(),
test_set.to_numpy(),
)
# normalization
scaler = StandardScaler()
train_X = scaler.fit_transform(train_X)
val_X = scaler.transform(val_X)
test_X = scaler.transform(test_X)
# reshape into time series samples
train_X = train_X.reshape(len(train_set_ids), 48, -1)
val_X = val_X.reshape(len(val_set_ids), 48, -1)
test_X = test_X.reshape(len(test_set_ids), 48, -1)
train_y = y[y.index.isin(train_set_ids)]
val_y = y[y.index.isin(val_set_ids)]
test_y = y[y.index.isin(test_set_ids)]
train_y, val_y, test_y = train_y.to_numpy(), val_y.to_numpy(), test_y.to_numpy()
data = {
"n_classes": 2,
"n_steps": 48,
"n_features": train_X.shape[-1],
"train_X": train_X,
"train_y": train_y.flatten(),
"val_X": val_X,
"val_y": val_y.flatten(),
"test_X": test_X,
"test_y": test_y.flatten(),
}
if artificially_missing:
# mask values in the validation set as ground truth
val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar(
val_X, 0.1
)
val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan)
# mask values in the test set as ground truth
test_X_intact, test_X, test_X_missing_mask, test_X_indicating_mask = mcar(
test_X, 0.1
)
test_X = masked_fill(test_X, 1 - test_X_missing_mask, torch.nan)
data["test_X_intact"] = test_X_intact
data["test_X_indicating_mask"] = test_X_indicating_mask
data["val_X_intact"] = val_X_intact
data["val_X_indicating_mask"] = val_X_indicating_mask
return data