-
Notifications
You must be signed in to change notification settings - Fork 42
/
base.py
358 lines (292 loc) · 12.3 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
from __future__ import annotations
import logging
import numpy as num
from matplotlib import pyplot as plt
from pyrocko.gf import StaticResult
from pyrocko.guts import List, Object
from pyrocko.guts_array import Array
from pyrocko.moment_tensor import symmat6, moment_to_magnitude
from matplotlib import pyplot as plt
from .sources import DiscretizedBEMSource, slip_comp_to_idx, check_intersection
try:
from cutde import halfspace as HS
from cutde.geometry import strain_to_stress
except ImportError:
raise ImportError("'Cutde' needs to be installed!")
logger = logging.getLogger("bem")
km = 1.0e3
class BEMResponse(Object):
sources = List.T(default=[])
targets = List.T(default=[])
discretized_sources = List.T()
displacements = Array.T(
shape=(None,), dtype=num.float32, serialize_as="base64", optional=True
)
target_ordering = Array.T(shape=(None,), dtype=num.int64, optional=True)
source_ordering = Array.T(shape=(None,), dtype=num.int64, optional=True)
inverted_slip_vectors = Array.T(shape=(None, 3), dtype=num.float32, optional=True)
@property
def n_sources(self):
return len(self.sources)
@property
def n_targets(self):
return len(self.targets)
@property
def is_valid(self):
if self.discretized_sources is None:
return False
else:
return True
def static_results(self) -> list[StaticResult]:
"""
Get target specific surface displacements in NED coordinates.
"""
results = []
for target_idx in range(self.n_targets):
start_idx = self.target_ordering[target_idx]
end_idx = self.target_ordering[target_idx + 1]
result = {
"displacement.n": self.displacements[start_idx:end_idx, 1],
"displacement.e": self.displacements[start_idx:end_idx, 0],
"displacement.d": -self.displacements[start_idx:end_idx, 2],
}
results.append(StaticResult(result=result))
return results
def source_slips(self) -> list[num.ndarray]:
"""
Get inverted slip vectors for sources
Returns
-------
array_like: [n_triangles, 3]
where columns are: strike, dip and tensile slip-components"""
slips = []
for src_idx in range(self.n_sources):
if self.source_ordering is not None:
start_idx = self.source_ordering[src_idx]
end_idx = self.source_ordering[src_idx + 1]
slips.append(self.inverted_slip_vectors[start_idx:end_idx, :])
else:
slips.append(None)
return slips
def get_source_magnitudes(self, shear_modulus):
inverted_slips = self.source_slips()
total_slips = [num.linalg.norm(slips, axis=1) for slips in inverted_slips]
magnitudes = []
for source, slips in zip(self.discretized_sources, total_slips):
moments = source.get_areas_triangles() * slips * shear_modulus
magnitudes.append(moment_to_magnitude(moments.sum()))
return magnitudes
def get_geometry(self, event):
"""Returns list of geometries of sources for plotting in the pyrocko.sparrow"""
def duplicate_property(array):
ndims = len(array.shape)
if ndims == 1:
return num.hstack((array, array))
elif ndims == 2:
return num.vstack((array, array))
else:
raise TypeError("Only 1-2d data supported!")
from pyrocko.model import Geometry
ncorners = 3
geoms = []
sources_slips = self.source_slips()
times = num.zeros(1)
for dsource, source_slips in zip(self.discretized_sources, sources_slips):
n_nodes = ncorners * dsource.n_vertices
latlon = num.ones((n_nodes, 2)) * num.array([event.lat, event.lon])
vertices = num.hstack([latlon, dsource.triangles_xyz.reshape((n_nodes, 3))])
geom = Geometry(times=times, event=event)
faces1 = num.arange(n_nodes, dtype="int64").reshape(
dsource.n_vertices, ncorners
)
faces2 = num.fliplr(faces1)
faces = num.vstack((faces1, faces2))
geom.setup(vertices, faces)
for slip_comp in ["strike", "dip", "normal"]:
slips = source_slips[slip_comp_to_idx[slip_comp]]
geom.add_property(((slip_comp, "float64", [])), duplicate_property(slips))
geoms.append(geom)
return geoms
class BEMEngine(object):
def __init__(self, config) -> None:
self.config = config
self._obs_points = None
self._ncoords_targets = None
def cache_target_coords3(self, targets, dtype="float32"):
ncoords_targets = num.cumsum([0] + [target.ncoords for target in targets])
if self._ncoords_targets is None:
self._ncoords_targets = ncoords_targets
coords_diff = 0
else:
coords_diff = self._ncoords_targets.sum() - ncoords_targets.sum()
if self._obs_points is None or coords_diff:
coords5 = num.vstack([target.coords5 for target in targets])
obs_pts = num.zeros((coords5.shape[0], 3))
obs_pts[:, 0] = coords5[:, 3]
obs_pts[:, 1] = coords5[:, 2]
self._obs_points = obs_pts.astype(dtype)
self._ncoords_targets = ncoords_targets
return self._obs_points
def clear_target_cache(self):
self._obs_points = None
self._ncoords_targets = None
def process(self, sources: list, targets: list, debug=False) -> num.ndarray:
mesh_size = self.config.mesh_size * km
if self.config.check_mesh_intersection:
intersect = check_intersection(sources, mesh_size=mesh_size)
else:
intersect = False
obs_points = self.cache_target_coords3(targets, dtype="float32")
if intersect:
return BEMResponse(
sources=sources,
targets=targets,
discretized_sources=None,
displacements=num.full(
(obs_points.shape[0], 3), -99.0, dtype="float64"
),
target_ordering=self._ncoords_targets,
source_ordering=None,
inverted_slip_vectors=None,
)
discretized_sources = [
source.discretize_basesource(mesh_size=mesh_size, plot=False)
for source in sources
]
coefficient_matrix = self.get_interaction_matrix(
discretized_sources, debug=debug
)
tractions = self.config.boundary_conditions.get_traction_field(
discretized_sources
)
if debug:
figs, ax = plt.subplots(1, 1)
im = ax.matshow(num.log(num.abs(coefficient_matrix)))
print(num.abs(coefficient_matrix).max())
ax.set_title("Interaction matrix")
plt.colorbar(im)
plt.savefig("intermat.pdf")
print("CEF shape", coefficient_matrix.shape)
# solve with least squares
inv_slips = num.linalg.multi_dot(
[
num.linalg.inv(coefficient_matrix.T.dot(coefficient_matrix)),
coefficient_matrix.T,
tractions,
]
)
all_triangles = num.vstack(
[source.triangles_xyz for source in discretized_sources]
)
disp_mat = HS.disp_matrix(
obs_pts=obs_points, tris=all_triangles, nu=self.config.nu
)
n_all_triangles = all_triangles.shape[0]
slips = num.zeros((n_all_triangles, 3))
start_idx = 0
sources_ntriangles = num.cumsum(
[start_idx] + [source.n_triangles for source in discretized_sources]
)
for bcond in self.config.boundary_conditions.iter_conditions():
for source_idx in bcond.source_idxs:
source_mesh = discretized_sources[source_idx]
end_idx = start_idx + source_mesh.n_triangles
slips[
sources_ntriangles[source_idx] : sources_ntriangles[source_idx + 1],
slip_comp_to_idx[bcond.slip_component],
] = inv_slips[start_idx:end_idx]
start_idx += source_mesh.n_triangles
displacements = disp_mat.reshape((-1, n_all_triangles * 3)).dot(slips.ravel())
return BEMResponse(
sources=sources,
targets=targets,
discretized_sources=discretized_sources,
displacements=displacements.reshape((-1, 3)),
target_ordering=self._ncoords_targets,
source_ordering=sources_ntriangles,
inverted_slip_vectors=slips,
)
def get_interaction_matrix(
self, discretized_sources: list, debug: bool
) -> num.ndarray:
G_slip_components = [[], [], []]
for bcond in self.config.boundary_conditions.iter_conditions():
for source_idx in bcond.source_idxs:
source_mesh = discretized_sources[source_idx]
Gs_strike = []
Gs_dip = []
Gs_normal = []
for receiver_idx in bcond.receiver_idxs:
receiver_mesh = discretized_sources[receiver_idx]
g_strike, g_dip, g_normal = get_coefficient_matrices_tdcs(
receiver_mesh,
source_mesh.triangles_xyz,
bcond.slip_component,
nu=self.config.nu,
mu=self.config.mu,
)
if debug:
figs, axs = plt.subplots(1, 3)
for k, (comp, g_comp) in enumerate(
zip(
("strike", "dip", "normal"), (g_strike, g_dip, g_normal)
)
):
axs[k].matshow(num.abs(g_comp))
axs[k].set_title(comp)
idx = num.unravel_index(num.argmax(num.abs(g_comp), axis=None), g_comp.shape)
print("td idx", idx)
print("comp", comp, num.abs(g_comp).max())
print("comp", comp, source_idx, receiver_idx)
figs.savefig("%i_%i.pdf" % (source_idx, receiver_idx))
plt.show()
Gs_strike.append(g_strike)
Gs_dip.append(g_dip)
Gs_normal.append(g_normal)
G_slip_components[0].append(num.vstack(Gs_strike))
G_slip_components[1].append(num.vstack(Gs_dip))
G_slip_components[2].append(num.vstack(Gs_normal))
return num.block(G_slip_components)
def get_store(self, store_id):
"""Dummy method to allow compatibility"""
return None
def get_coefficient_matrices_tdcs(
discretized_bem_source: DiscretizedBEMSource,
triangles_xyz: num.ndarray,
slip_component: str,
nu: float,
mu: float,
) -> list[num.ndarray]:
"""
Calculates interaction matrix between source triangles and receiver triangles.
Parameters
----------
slip_component:
Returns
-------
"""
strain_mat = HS.strain_matrix(
discretized_bem_source.centroids, triangles_xyz, nu=nu
)
# select relevant source slip vector component indexs (0-strike, 1-dip, 2-tensile)
slip_idx = slip_comp_to_idx[slip_component]
comp_strain_mat = strain_mat[:, :, :, slip_idx]
comp_strain_mat_T = num.transpose(comp_strain_mat, (0, 2, 1))
comp_stress_mat_T = strain_to_stress(
comp_strain_mat_T.reshape((-1, 6)), mu, nu
).reshape(comp_strain_mat_T.shape)
stress_mat_m9s = symmat6(*comp_stress_mat_T.T).T
# get traction vector from Stress tensor
tvs = num.sum(
stress_mat_m9s * discretized_bem_source.unit_normal_vectors[:, None, None, :],
axis=-1,
)
# get stress components from traction vector
g_strike = num.sum(
tvs * discretized_bem_source.unit_strike_vectors[:, None, :], axis=-1
)
g_dip = num.sum(tvs * discretized_bem_source.unit_dip_vectors[:, None, :], axis=-1)
g_normal = num.sum(
tvs * discretized_bem_source.unit_normal_vectors[:, None, :], axis=-1
)
return g_strike, g_dip, -g_normal # Minus is needed due to ENU convention