-
Notifications
You must be signed in to change notification settings - Fork 15
/
__init__.py
188 lines (144 loc) · 6.45 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from aqt import mw
from aqt.utils import showInfo
from aqt.qt import *
from anki.hooks import addHook
from anki.importing.noteimp import NoteImporter, ForeignNote
import http.client, urllib.request, urllib.parse, urllib.error
from urllib.parse import urlparse
import hashlib
import ssl
import json
import pprint
import subprocess
Config = mw.addonManager.getConfig(__name__)
class AirtableImporter(NoteImporter):
importMode = 0
allowHTML = True
def __init__(self, col, table, view, api_key, app_key):
NoteImporter.__init__(self, col, "")
self.records = None
self.table = table
self.view = view
self.api_key = api_key
self.app_key = app_key
def outputLog(self):
for statement in self.log:
sys.stderr.write(statement + "\n")
# TODO: This plugin should own the model completely. The document ID should
# also be in there, so if the name changes or is removed, another card is
# not created. Instead, old documents should be removed.
def updateModel(self, model):
models = mw.col.models
fieldNames = models.fieldNames(model)
for record in self.getRecords():
for key in list(record["fields"].keys()):
if key not in fieldNames:
field = models.newField(key)
models.addField(model, field)
fieldNames = models.fieldNames(model)
def fields(self):
return len(self.model['flds'])
def foreignNotes(self):
notes = []
for record in self.getRecords():
note = ForeignNote()
fields = record["fields"]
modelFields = self.model["flds"]
for field in modelFields:
key = field['name']
# sys.stderr.write(key + "\n")
if key in fields:
if isinstance(fields[key], list):
if len(fields[key]) == 0:
note.fields.append("")
elif isinstance(fields[key][0], dict):
note.fields.append(self.downloadToCollection(fields[key]))
else:
strings = [str(e) for e in fields[key]]
note.fields.append(str(", ".join(strings)))
else:
if not isinstance(fields[key], str) and not isinstance(fields[key], str):
fields[key] = str(fields[key])
asciiToHtml = fields[key].replace("\n", "<br/>")
note.fields.append(asciiToHtml)
else:
note.fields.append("")
# This has some encoding problems.
# sys.stderr.write(key + " => " + note.fields[len(note.fields) - 1] + "\n")
notes.append(note)
return notes
def getRecords(self):
if self.records:
return self.records
else:
self.records = self.getRecordsWithOffset(None)
return self.records
def getRecordsWithOffset(self, offset = None):
offsetString = ""
if offset is not None:
offsetString = "&offset={}".format(offset)
headers = { "Authorization": "Bearer " + self.api_key }
conn = http.client.HTTPSConnection("api.airtable.com", context = ssl._create_unverified_context())
conn.request("GET", "/v0/{}/{}?view={}{}".format(self.app_key, self.table, self.view, offsetString), "", headers)
response = conn.getresponse()
raw_response = response.read()
json_response = json.loads(raw_response)
if "error" in json_response:
sys.stderr.write(raw_response)
records = json_response["records"]
if "offset" in json_response:
records += self.getRecordsWithOffset(json_response["offset"])
return records
def downloadToCollection(self, media):
field = ""
for medium in media:
url = medium["url"]
if "thumbnails" in medium: # spare some size if it's an image!
url = medium["thumbnails"]["large"]["url"]
_, extension = os.path.splitext(url)
digest = hashlib.md5(url.encode('utf-8')).hexdigest()
filename = "{}{}".format(digest, extension)
location = str(Config["media_path"]).format(filename)
command = "curl {} -o '{}'".format(url, location)
if not os.path.isfile(location):
return_code = subprocess.call(command, shell=True)
if not extension or extension == "":
find_extension = "file --extension \"{}\" | grep -oE \":.+$\" | grep -oE \"[^\/ :]+\" | head -n1".format(location)
mime_ext = subprocess.check_output(find_extension, shell=True)
extension = ".{}".format(mime_ext.decode("utf-8").strip())
link = "ln -s \"{}\" \"{}{}\"".format(location, location, extension)
subprocess.call(link, shell=True)
filename = "{}{}".format(digest, extension)
if extension == ".jpg" or extension == ".png" or extension == ".jpeg":
field += "<img src=\"{}\" /> ".format(filename)
elif extension == ".ogg" or extension == ".mp3":
field += "[sound:{}]".format(filename)
else:
field += filename
return field
def airtableImport(col, deck, modelName, table, view, app_key):
did = mw.col.decks.id(deck)
mw.col.decks.select(did)
model = mw.col.models.byName(modelName)
if not model:
model = mw.col.models.new(modelName)
mw.col.models.add(model)
template = mw.col.models.newTemplate("Default")
mw.col.models.addTemplate(model, template)
model['did'] = did
deck = mw.col.decks.get(did)
deck['mid'] = model['id']
mw.col.decks.save(deck)
airtable = AirtableImporter(mw.col, table, view, Config["key"], app_key)
airtable.updateModel(model)
airtable.initMapping()
airtable.run()
# TODO: Two-way sync would be dope, so fixing typos or changing content would be
# propegated to Airtable. E.g. when visiting a vocab card, it'd be great to add
# another sentence each time.
#
# TODO: Automatic cloze cards for English words.
def hook():
for table in Config["tables"]:
airtableImport(mw.col, table["anki_deck"], table["anki_model"], table["airtable_table"], table["airtable_view"], table["airtable_key"])
addHook("profileLoaded", hook)