-
Notifications
You must be signed in to change notification settings - Fork 3
/
import_users_from_json_file.py
47 lines (37 loc) · 1.89 KB
/
import_users_from_json_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import ckanapi
import json
import argparse
import re
import string
import random
parser = argparse.ArgumentParser(description='Export users and their organizations')
parser.add_argument("BASE_URL", help="Base URL of ckan")
parser.add_argument("-a", "--apikey", help="Apikey")
args = parser.parse_args()
ckan = ckanapi.RemoteCKAN(args.BASE_URL, user_agent="user importer", apikey=args.apikey)
organizations = ckan.call_action('organization_list', requests_kwargs={'verify': False})
regex = re.compile(r'(-\d*$)')
def generate_password():
return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(12))
with open("users.json", "r") as data:
users = json.load(data)
for user in users:
username = re.sub(regex, '-test', user['name'])
if user['name'] is username:
username += "-test"
if any(name in [org['name'] for org in user['organizations']] for name in organizations):
print("Organization found, user should be created")
try:
user_obj = ckan.call_action('user_create', {"name": username, "email": user['email'],
"password": generate_password() })
except ckanapi.ValidationError as e:
print("User already exists %s %s" % (username, e))
user_obj = ckan.call_action('user_show', {"id": username})
print("add user to organization")
for org in user['organizations']:
try:
ckan.call_action('organization_member_create', {'id': org['name'], 'username': user_obj['id'], 'role': org['capacity']})
except ckanapi.ValidationError as e:
print("Organization not found %s" % org['name'])
print("Send reset email")
ckan.call_action('send_reset_link', {'user_id': user_obj['id']})