-
Notifications
You must be signed in to change notification settings - Fork 37
/
outputmetrics.py
304 lines (261 loc) · 13.5 KB
/
outputmetrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import collections
import logging
import os
import cdms2
import pcmdi_metrics
from pcmdi_metrics.io.base import Base
from pcmdi_metrics.driver.observation import Observation
from pcmdi_metrics.driver.dataset import DataSet
from pcmdi_metrics import LOG_LEVEL
try:
basestring # noqa
except Exception:
basestring = str
class OutputMetrics(object):
def __init__(self, parameter, var_name_long, obs_dict, sftlf):
logging.getLogger("pcmdi_metrics").setLevel(LOG_LEVEL)
self.parameter = parameter
self.var_name_long = var_name_long
self.obs_dict = obs_dict
self.var = var_name_long.split('_')[0]
self.sftlf = sftlf
self.metrics_def_dictionary = {}
self.metrics_dictionary = {}
self.out_file = Base(self.parameter.metrics_output_path, self.parameter.output_json_template)
self.regrid_method = ''
self.regrid_tool = ''
self.table_realm = ''
self.realm = ''
self.setup_regrid_and_realm_vars()
self.setup_out_file()
self.setup_metrics_dictionary()
def setup_metrics_dictionary(self):
''' Initalize the results dict (metrics_dictionary) and the metrics documentation
dict (metrics_def_dictionary) which is put in the results dict. '''
self.metrics_def_dictionary = collections.OrderedDict()
self.metrics_dictionary = collections.OrderedDict()
self.metrics_dictionary["DISCLAIMER"] = self.open_disclaimer()
if self.parameter.user_notes is not None:
self.metrics_dictionary["USER_NOTES"] = self.parameter.user_notes
self.metrics_dictionary["RESULTS"] = collections.OrderedDict()
self.metrics_dictionary["Variable"] = {}
self.metrics_dictionary["Variable"]["id"] = self.var
self.metrics_dictionary["json_version"] = '3.0'
self.metrics_dictionary["References"] = {}
self.metrics_dictionary["RegionalMasking"] = {}
level = DataSet.calculate_level_from_var(self.var_name_long)
if level is None:
self.out_file.level = ''
else:
self.metrics_dictionary["Variable"]["level"] = level
self.out_file.level = "-%i" % (int(level / 100.0))
def open_disclaimer(self):
''' Return the contents of disclaimer.txt. '''
f = DataSet.load_path_as_file_obj('disclaimer.txt')
contents = f.read()
f.close()
return contents
def setup_regrid_and_realm_vars(self):
''' Set the regrid_method, regrid_tool, table_realm,
and realm based off the obs dict and var. '''
if DataSet.use_omon(self.obs_dict, self.var):
self.regrid_method = self.parameter.regrid_method_ocn
self.regrid_tool = self.parameter.regrid_tool_ocn
self.table_realm = 'Omon'
self.realm = "ocn"
else:
self.regrid_method = self.parameter.regrid_method
self.regrid_tool = self.parameter.regrid_tool
self.table_realm = 'Amon'
self.realm = "atm"
def setup_out_file(self):
''' Setup for the out_file, which outputs both the .json and .txt. '''
self.out_file.set_target_grid(
self.parameter.target_grid, self.regrid_tool, self.regrid_method)
self.out_file.variable = self.var
self.out_file.realm = self.realm
self.out_file.table = self.table_realm
self.out_file.case_id = self.parameter.case_id
if hasattr(self, "obs_or_model"):
self.out_file.model_version = self.obs_or_model
for key in self.out_file.keys():
if hasattr(self.parameter, key):
setattr(self.out_file, key, getattr(self.parameter, key))
if hasattr(self, key):
setattr(self.out_file, key, getattr(self, key))
DataSet.apply_custom_keys(self.out_file, self.parameter.custom_keys, self.var)
def add_region(self, region):
''' Add a region to the metrics_dictionary. '''
self.metrics_dictionary['RegionalMasking'][self.get_region_name_from_region(region)] = region
def calculate_and_output_metrics(self, ref, test):
''' Given ref and test (both either of type Observation or Model), compute the metrics. '''
if isinstance(self.obs_dict[self.var][ref.obs_or_model], basestring):
self.obs_var_ref = self.obs_dict[self.var][self.obs_dict[self.var][ref.obs_or_model]]
else:
self.obs_var_ref = self.obs_dict[self.var][ref.obs_or_model]
self.metrics_dictionary['References'][ref.obs_or_model] = self.obs_var_ref
try:
ref_data = ref()
except Exception as e:
msg = 'Error while processing observation %s for variables %s:\n\t%s'
logging.getLogger("pcmdi_metrics").error(msg % (ref.obs_or_model, self.var, str(e)))
if ref_data is None: # Something went bad!
raise RuntimeError('Could not load reference {}'.format(ref.obs_or_model))
try:
test_data = test()
except RuntimeError:
# THIS EXCEPTION IS RAISED TO BREAK OUT OF THE FOR LOOP IN PCMDI_DRIVER
# THIS SHOULD BE A CUSTOM EXCEPTION (PrematureBreakError)
raise RuntimeError('Need to skip model: %s' % test.obs_or_model)
# Todo: Make this a fcn
self.set_grid_in_metrics_dictionary(test_data)
if ref_data.shape != test_data.shape:
raise RuntimeError('Two data sets have different shapes. %s vs %s' % (ref_data.shape, test_data.shape))
self.set_simulation_desc(test, test_data)
if ref.obs_or_model not in self.metrics_dictionary['RESULTS'][test.obs_or_model]:
self.metrics_dictionary["RESULTS"][test.obs_or_model][ref.obs_or_model] = \
{'source': self.obs_dict[self.var][ref.obs_or_model]}
parameter_realization = self.metrics_dictionary["RESULTS"][test.obs_or_model][ref.obs_or_model].\
get(self.parameter.realization, {})
if not self.parameter.dry_run:
pr_rgn = pcmdi_metrics.pcmdi.compute_metrics(self.var_name_long, test_data, ref_data)
# Calling compute_metrics with None for the model and obs returns
# the definitions.
self.metrics_def_dictionary.update(
pcmdi_metrics.pcmdi.compute_metrics(self.var_name_long, None, None))
if hasattr(self.parameter, 'compute_custom_metrics'):
pr_rgn.update(
self.parameter.compute_custom_metrics(self.var_name_long,
test_data, ref_data))
try:
self.metrics_def_dictionary.update(
self.parameter.compute_custom_metrics(
self.var_name_long, None, None))
except Exception:
self.metrics_def_dictionary.update(
{'custom': self.parameter.compute_custom_metrics.__doc__})
parameter_realization[self.get_region_name_from_region(ref.region)] = collections.OrderedDict(
(k, pr_rgn[k]) for k in sorted(pr_rgn.keys())
)
self.metrics_dictionary['RESULTS'][test.obs_or_model][ref.obs_or_model][self.parameter.realization] = \
parameter_realization
if self.check_save_test_clim(ref):
self.output_interpolated_model_climatologies(test, test_data)
self.write_on_exit()
def set_grid_in_metrics_dictionary(self, test_data):
''' Set the grid in metrics_dictionary. '''
grid = {}
grid['RegridMethod'] = self.regrid_method
grid['RegridTool'] = self.regrid_tool
grid['GridName'] = self.parameter.target_grid
grid['GridResolution'] = test_data.shape[1:]
self.metrics_dictionary['GridInfo'] = grid
def set_simulation_desc(self, test, test_data):
''' Fillout information for the output .json and .txt files. '''
self.metrics_dictionary["RESULTS"][test.obs_or_model] = \
self.metrics_dictionary["RESULTS"].get(test.obs_or_model, {})
if "SimulationDescription" not in \
self.metrics_dictionary["RESULTS"][test.obs_or_model]:
descr = {"MIPTable": self.obs_var_ref["CMIP_CMOR_TABLE"],
"Model": test.obs_or_model,
}
sim_descr_mapping = {
"ModelActivity": "project_id",
"ModellingGroup": "institute_id",
"Experiment": "experiment",
"ModelFreeSpace": "ModelFreeSpace",
"Realization": "realization",
"creation_date": "creation_date",
}
sim_descr_mapping.update(
getattr(self.parameter, "simulation_description_mapping", {}))
for att in list(sim_descr_mapping.keys()):
nm = sim_descr_mapping[att]
if not isinstance(nm, (list, tuple)):
nm = ["%s", nm]
fmt = nm[0]
vals = []
for a in nm[1:]:
# First trying from parameter file
if hasattr(self.parameter, a):
vals.append(getattr(self.parameter, a))
# Now fall back on file...
else:
f = cdms2.open(test.file_path())
if hasattr(f, a):
try:
vals.append(float(getattr(f, a)))
except Exception:
vals.append(getattr(f, a))
# Ok couldn't find it anywhere
# setting to N/A
else:
vals.append("N/A")
f.close()
descr[att] = fmt % tuple(vals)
self.metrics_dictionary["RESULTS"][test.obs_or_model]["units"] = \
getattr(test_data, "units", "N/A")
self.metrics_dictionary["RESULTS"][test.obs_or_model]["SimulationDescription"] = descr
self.metrics_dictionary["RESULTS"][test.obs_or_model]["InputClimatologyFileName"] = \
os.path.basename(test.file_path())
self.metrics_dictionary["RESULTS"][test.obs_or_model]["InputClimatologyMD5"] = test.hash()
# Not just global
# TODO Ask Charles if the below check is needed
# if len(self.regions_dict[self.var]) > 1:
self.metrics_dictionary["RESULTS"][test.obs_or_model][
"InputRegionFileName"] = \
self.sftlf[test.obs_or_model]["filename"]
self.metrics_dictionary["RESULTS"][test.obs_or_model][
"InputRegionMD5"] = \
self.sftlf[test.obs_or_model]["md5"]
def output_interpolated_model_climatologies(self, test, test_data):
''' Save the netCDF file. '''
region_name = self.get_region_name_from_region(test.region)
pth = os.path.join(self.parameter.test_clims_interpolated_output,
region_name)
clim_file = Base(pth, self.parameter.filename_output_template)
logging.getLogger("pcmdi_metrics").info('Saving interpolated climatologies to: %s' % clim_file())
clim_file.level = self.out_file.level
clim_file.model_version = test.obs_or_model
clim_file.table = self.table_realm
clim_file.period = self.parameter.period
clim_file.case_id = self.parameter.case_id
clim_file.set_target_grid(
self.parameter.target_grid,
self.regrid_tool,
self.regrid_method)
clim_file.variable = self.var
clim_file.region = region_name
clim_file.realization = self.parameter.realization
DataSet.apply_custom_keys(clim_file, self.parameter.custom_keys, self.var)
clim_file.write(test_data, type="nc", id=self.var)
def get_region_name_from_region(self, region):
''' Extract the region name from the region dict. '''
# region is both in ref and test
region_name = region['id']
if region is None:
region_name = 'global'
return region_name
def check_save_test_clim(self, ref):
''' Bunch of checks to see if the netCDF files are needed to be saved. '''
# Since we are only saving once per reference data set (it's always
# the same after), we need to check if ref is the first value from the
# parameter, hence we have ref.obs_or_model == reference_data_set[0]
reference_data_set = self.parameter.reference_data_set
reference_data_set = Observation.setup_obs_list_from_parameter(
reference_data_set, self.obs_dict, self.var)
return not self.parameter.dry_run and hasattr(self.parameter, 'save_test_clims') \
and self.parameter.save_test_clims is True and ref.obs_or_model == reference_data_set[0] # noqa
def write_on_exit(self):
''' Output the metrics_dictionary as a json and text file. '''
self.setup_out_file()
self.metrics_dictionary['METRICS'] = self.metrics_def_dictionary
if len(self.metrics_def_dictionary) == 0:
raise RuntimeError("No results generated, cannot write to file")
if not self.parameter.dry_run:
logging.getLogger("pcmdi_metrics").info('Saving results to: %s' % self.out_file())
self.out_file.write(self.metrics_dictionary,
json_structure=["model", "reference", "rip", "region", "statistic", "season"],
indent=4,
separators=(',', ': '),
mode="r+")