forked from ckan/ckanext-dcat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rdf.py
428 lines (323 loc) · 16.4 KB
/
rdf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
from builtins import str
from past.builtins import basestring
import json
import uuid
import logging
import hashlib
import traceback
import six
import ckan.plugins as p
import ckan.model as model
import ckan.lib.plugins as lib_plugins
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.logic.schema import unicode_safe
from ckanext.dcat.harvesters.base import DCATHarvester
from ckanext.dcat.processors import RDFParserException, RDFParser
from ckanext.dcat.interfaces import IDCATRDFHarvester
from ckan.logic import NotFound
log = logging.getLogger(__name__)
class DCATRDFHarvester(DCATHarvester):
def info(self):
return {
'name': 'dcat_rdf',
'title': 'Generic DCAT RDF Harvester',
'description': 'Harvester for DCAT datasets from an RDF graph'
}
_names_taken = []
def _get_dict_value(self, _dict, key, default=None):
'''
Returns the value for the given key on a CKAN dict
By default a key on the root level is checked. If not found, extras
are checked, both with the key provided and with `dcat_` prepended to
support legacy fields.
If not found, returns the default value, which defaults to None
'''
if key in _dict:
return _dict[key]
for extra in _dict.get('extras', []):
if extra['key'] == key or extra['key'] == 'dcat_' + key:
return extra['value']
return default
def _get_guid(self, dataset_dict, source_url=None):
'''
Try to get a unique identifier for a harvested dataset
It will be the first found of:
* URI (rdf:about)
* dcat:identifier
* Source URL + Dataset name
* Dataset name
The last two are obviously not optimal, as depend on title, which
might change.
Returns None if no guid could be decided.
'''
guid = None
guid = (
self._get_dict_value(dataset_dict, 'uri') or
self._get_dict_value(dataset_dict, 'identifier')
)
if guid:
return guid
if dataset_dict.get('name'):
guid = dataset_dict['name']
if source_url:
guid = source_url.rstrip('/') + '/' + guid
return guid
def _mark_datasets_for_deletion(self, guids_in_source, harvest_job):
'''
Given a list of guids in the remote source, checks which in the DB
need to be deleted
To do so it queries all guids in the DB for this source and calculates
the difference.
For each of these creates a HarvestObject with the dataset id, marked
for deletion.
Returns a list with the ids of the Harvest Objects to delete.
'''
object_ids = []
# Get all previous current guids and dataset ids for this source
query = model.Session.query(HarvestObject.guid, HarvestObject.package_id) \
.filter(HarvestObject.current==True) \
.filter(HarvestObject.harvest_source_id==harvest_job.source.id)
guid_to_package_id = {}
for guid, package_id in query:
guid_to_package_id[guid] = package_id
guids_in_db = list(guid_to_package_id.keys())
# Get objects/datasets to delete (ie in the DB but not in the source)
guids_to_delete = set(guids_in_db) - set(guids_in_source)
# Create a harvest object for each of them, flagged for deletion
for guid in guids_to_delete:
obj = HarvestObject(guid=guid, job=harvest_job,
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status',
value='delete')])
# Mark the rest of objects for this guid as not current
model.Session.query(HarvestObject) \
.filter_by(guid=guid) \
.update({'current': False}, False)
obj.save()
object_ids.append(obj.id)
return object_ids
def validate_config(self, source_config):
if not source_config:
return source_config
source_config_obj = json.loads(source_config)
if 'rdf_format' in source_config_obj:
rdf_format = source_config_obj['rdf_format']
if not isinstance(rdf_format, basestring):
raise ValueError('rdf_format must be a string')
supported_formats = RDFParser().supported_formats()
if rdf_format not in supported_formats:
raise ValueError('rdf_format should be one of: ' + ", ".join(supported_formats))
return source_config
def gather_stage(self, harvest_job):
log.debug('In DCATRDFHarvester gather_stage')
rdf_format = None
if harvest_job.source.config:
rdf_format = json.loads(harvest_job.source.config).get("rdf_format")
# Get file contents of first page
next_page_url = harvest_job.source.url
guids_in_source = []
object_ids = []
last_content_hash = None
self._names_taken = []
while next_page_url:
for harvester in p.PluginImplementations(IDCATRDFHarvester):
next_page_url, before_download_errors = harvester.before_download(next_page_url, harvest_job)
for error_msg in before_download_errors:
self._save_gather_error(error_msg, harvest_job)
if not next_page_url:
return []
content, rdf_format = self._get_content_and_type(next_page_url, harvest_job, 1, content_type=rdf_format)
content_hash = hashlib.md5()
if content:
if six.PY2:
content_hash.update(content)
else:
content_hash.update(content.encode('utf8'))
if last_content_hash:
if content_hash.digest() == last_content_hash.digest():
log.warning('Remote content was the same even when using a paginated URL, skipping')
break
else:
last_content_hash = content_hash
# TODO: store content?
for harvester in p.PluginImplementations(IDCATRDFHarvester):
content, after_download_errors = harvester.after_download(content, harvest_job)
for error_msg in after_download_errors:
self._save_gather_error(error_msg, harvest_job)
if not content:
return []
# TODO: profiles conf
parser = RDFParser()
try:
parser.parse(content, _format=rdf_format)
except RDFParserException as e:
self._save_gather_error('Error parsing the RDF file: {0}'.format(e), harvest_job)
return []
for harvester in p.PluginImplementations(IDCATRDFHarvester):
parser, after_parsing_errors = harvester.after_parsing(parser, harvest_job)
for error_msg in after_parsing_errors:
self._save_gather_error(error_msg, harvest_job)
if not parser:
return []
try:
source_dataset = model.Package.get(harvest_job.source.id)
for dataset in parser.datasets():
if not dataset.get('name'):
dataset['name'] = self._gen_new_name(dataset['title'])
if dataset['name'] in self._names_taken:
suffix = len([i for i in self._names_taken if i.startswith(dataset['name'] + '-')]) + 1
dataset['name'] = '{}-{}'.format(dataset['name'], suffix)
self._names_taken.append(dataset['name'])
# Unless already set by the parser, get the owner organization (if any)
# from the harvest source dataset
if not dataset.get('owner_org'):
if source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org
# Try to get a unique identifier for the harvested dataset
guid = self._get_guid(dataset, source_url=source_dataset.url)
if not guid:
self._save_gather_error('Could not get a unique identifier for dataset: {0}'.format(dataset),
harvest_job)
continue
dataset['extras'].append({'key': 'guid', 'value': guid})
guids_in_source.append(guid)
obj = HarvestObject(guid=guid, job=harvest_job,
content=json.dumps(dataset))
obj.save()
object_ids.append(obj.id)
except Exception as e:
self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()),
harvest_job)
return []
# get the next page
next_page_url = parser.next_page()
# Check if some datasets need to be deleted
object_ids_to_delete = self._mark_datasets_for_deletion(guids_in_source, harvest_job)
object_ids.extend(object_ids_to_delete)
return object_ids
def fetch_stage(self, harvest_object):
# Nothing to do here
return True
def import_stage(self, harvest_object):
log.debug('In DCATRDFHarvester import_stage')
status = self._get_object_extra(harvest_object, 'status')
if status == 'delete':
# Delete package
context = {'model': model, 'session': model.Session,
'user': self._get_user_name(), 'ignore_auth': True}
try:
p.toolkit.get_action('package_delete')(context, {'id': harvest_object.package_id})
log.info('Deleted package {0} with guid {1}'.format(harvest_object.package_id,
harvest_object.guid))
except NotFound:
log.info('Package {0} already deleted.'.format(harvest_object.package_id))
return True
if harvest_object.content is None:
self._save_object_error('Empty content for object {0}'.format(harvest_object.id),
harvest_object, 'Import')
return False
try:
dataset = json.loads(harvest_object.content)
except ValueError:
self._save_object_error('Could not parse content for object {0}'.format(harvest_object.id),
harvest_object, 'Import')
return False
# Get the last harvested object (if any)
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid==harvest_object.guid) \
.filter(HarvestObject.current==True) \
.first()
# Flag previous object as not current anymore
if previous_object:
previous_object.current = False
previous_object.add()
# Flag this object as the current one
harvest_object.current = True
harvest_object.add()
context = {
'user': self._get_user_name(),
'return_id_only': True,
'ignore_auth': True,
}
dataset = self.modify_package_dict(dataset, {}, harvest_object)
# Check if a dataset with the same guid exists
existing_dataset = self._get_existing_dataset(harvest_object.guid)
try:
package_plugin = lib_plugins.lookup_package_plugin(dataset.get('type', None))
if existing_dataset:
package_schema = package_plugin.update_package_schema()
for harvester in p.PluginImplementations(IDCATRDFHarvester):
package_schema = harvester.update_package_schema_for_update(package_schema)
context['schema'] = package_schema
# Don't change the dataset name even if the title has
dataset['name'] = existing_dataset['name']
dataset['id'] = existing_dataset['id']
harvester_tmp_dict = {}
# check if resources already exist based on their URI
existing_resources = existing_dataset.get('resources')
resource_mapping = {r.get('uri'): r.get('id') for r in existing_resources if r.get('uri')}
for resource in dataset.get('resources'):
res_uri = resource.get('uri')
if res_uri and res_uri in resource_mapping:
resource['id'] = resource_mapping[res_uri]
for harvester in p.PluginImplementations(IDCATRDFHarvester):
harvester.before_update(harvest_object, dataset, harvester_tmp_dict)
try:
if dataset:
# Save reference to the package on the object
harvest_object.package_id = dataset['id']
harvest_object.add()
p.toolkit.get_action('package_update')(context, dataset)
else:
log.info('Ignoring dataset %s' % existing_dataset['name'])
return 'unchanged'
except p.toolkit.ValidationError as e:
self._save_object_error('Update validation Error: %s' % str(e.error_summary), harvest_object, 'Import')
return False
for harvester in p.PluginImplementations(IDCATRDFHarvester):
err = harvester.after_update(harvest_object, dataset, harvester_tmp_dict)
if err:
self._save_object_error('RDFHarvester plugin error: %s' % err, harvest_object, 'Import')
return False
log.info('Updated dataset %s' % dataset['name'])
else:
package_schema = package_plugin.create_package_schema()
for harvester in p.PluginImplementations(IDCATRDFHarvester):
package_schema = harvester.update_package_schema_for_create(package_schema)
context['schema'] = package_schema
# We need to explicitly provide a package ID
dataset['id'] = str(uuid.uuid4())
package_schema['id'] = [unicode_safe]
harvester_tmp_dict = {}
name = dataset['name']
for harvester in p.PluginImplementations(IDCATRDFHarvester):
harvester.before_create(harvest_object, dataset, harvester_tmp_dict)
try:
if dataset:
# Save reference to the package on the object
harvest_object.package_id = dataset['id']
harvest_object.add()
# Defer constraints and flush so the dataset can be indexed with
# the harvest object id (on the after_show hook from the harvester
# plugin)
model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
model.Session.flush()
p.toolkit.get_action('package_create')(context, dataset)
else:
log.info('Ignoring dataset %s' % name)
return 'unchanged'
except p.toolkit.ValidationError as e:
self._save_object_error('Create validation Error: %s' % str(e.error_summary), harvest_object, 'Import')
return False
for harvester in p.PluginImplementations(IDCATRDFHarvester):
err = harvester.after_create(harvest_object, dataset, harvester_tmp_dict)
if err:
self._save_object_error('RDFHarvester plugin error: %s' % err, harvest_object, 'Import')
return False
log.info('Created dataset %s' % dataset['name'])
except Exception as e:
self._save_object_error('Error importing dataset %s: %r / %s' % (dataset.get('name', ''), e, traceback.format_exc()), harvest_object, 'Import')
return False
finally:
model.Session.commit()
return True