-
Notifications
You must be signed in to change notification settings - Fork 4
/
xbmc.py
303 lines (240 loc) · 10.3 KB
/
xbmc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Dag Wieers (@dagwieers) <[email protected]>
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""This file implements the Kodi xbmc module, either using stubs or alternative functionality"""
# pylint: disable=invalid-name,no-self-use,too-many-branches,unused-argument
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import json
import time
import weakref
from xbmcextra import ADDON_ID, global_settings, import_language
from kodiutils import to_unicode
try: # Python 2
basestring
except NameError: # Python 3
basestring = str # pylint: disable=redefined-builtin
LOGLEVELS = ['Debug', 'Info', 'Notice', 'Warning', 'Error', 'Severe', 'Fatal', 'None']
LOGDEBUG = 0
LOGINFO = 1
LOGNOTICE = 2
LOGWARNING = 3
LOGERROR = 4
LOGSEVERE = 5
LOGFATAL = 6
LOGNONE = 7
INFO_LABELS = {
'Container.FolderPath': 'plugin:https://' + ADDON_ID + '/',
'System.BuildVersion': '18.2',
}
REGIONS = {
'datelong': '%A, %e %B %Y',
'dateshort': '%Y-%m-%d',
}
settings = global_settings()
LANGUAGE = import_language(language=settings.get('locale.language'))
class Keyboard(object): # pylint: disable=useless-object-inheritance
"""A stub implementation of the xbmc Keyboard class"""
def __init__(self, line='', heading=''):
"""A stub constructor for the xbmc Keyboard class"""
def doModal(self, autoclose=0):
"""A stub implementation for the xbmc Keyboard class doModal() method"""
def isConfirmed(self):
"""A stub implementation for the xbmc Keyboard class isConfirmed() method"""
return True
def getText(self):
"""A stub implementation for the xbmc Keyboard class getText() method"""
return 'test'
class Monitor(object): # pylint: disable=useless-object-inheritance
"""A stub implementation of the xbmc Monitor class"""
_instances = set()
def __init__(self, line='', heading=''):
"""A stub constructor for the xbmc Monitor class"""
self.iteration = 0
self._instances.add(weakref.ref(self))
def abortRequested(self):
"""A stub implementation for the xbmc Keyboard class abortRequested() method"""
self.iteration += 1
print('Iteration: %s' % self.iteration)
return self.iteration % 5 == 0
def waitForAbort(self, timeout=None):
"""A stub implementation for the xbmc Monitor class waitForAbort() method"""
try:
time.sleep(timeout)
except KeyboardInterrupt:
return True
except Exception: # pylint: disable=broad-except
return True
return False
@classmethod
def getinstances(cls):
"""Return the instances for this class"""
dead = set()
for ref in cls._instances:
obj = ref()
if obj is not None:
yield obj
else:
dead.add(ref)
cls._instances -= dead
class Player(object): # pylint: disable=useless-object-inheritance
"""A stub implementation of the xbmc Player class"""
def __init__(self):
"""A stub constructor for the xbmc Player class"""
self._count = 0
def play(self, item='', listitem=None, windowed=False, startpos=-1):
"""A stub implementation for the xbmc Player class play() method"""
return
def stop(self):
"""A stub implementation for the xbmc Player class stop() method"""
return
def getPlayingFile(self):
"""A stub implementation for the xbmc Player class getPlayingFile() method"""
return '/foo/bar'
def isPlaying(self):
"""A stub implementation for the xbmc Player class isPlaying() method"""
# Return True four times out of five
self._count += 1
return bool(self._count % 5 != 0)
def seekTime(self, seekTime):
"""A stub implementation for the xbmc Player class seekTime() method"""
return
def showSubtitles(self, bVisible):
"""A stub implementation for the xbmc Player class showSubtitles() method"""
return
def getTotalTime(self):
"""A stub implementation for the xbmc Player class getTotalTime() method"""
return 0
def getTime(self):
"""A stub implementation for the xbmc Player class getTime() method"""
return 0
def getVideoInfoTag(self):
"""A stub implementation for the xbmc Player class getVideoInfoTag() method"""
return VideoInfoTag()
class PlayList(object): # pylint: disable=useless-object-inheritance
"""A stub implementation of the xbmc PlayList class"""
def __init__(self, playList):
"""A stub constructor for the xbmc PlayList class"""
def getposition(self):
"""A stub implementation for the xbmc PlayList class getposition() method"""
return 0
def add(self, url, listitem=None, index=-1):
"""A stub implementation for the xbmc PlayList class add() method"""
def size(self):
"""A stub implementation for the xbmc PlayList class size() method"""
class VideoInfoTag(object): # pylint: disable=useless-object-inheritance
"""A stub implementation of the xbmc VideoInfoTag class"""
def __init__(self):
"""A stub constructor for the xbmc VideoInfoTag class"""
def getSeason(self):
"""A stub implementation for the xbmc VideoInfoTag class getSeason() method"""
return 0
def getEpisode(self):
"""A stub implementation for the xbmc VideoInfoTag class getEpisode() method"""
return 0
def getTVShowTitle(self):
"""A stub implementation for the xbmc VideoInfoTag class getTVShowTitle() method"""
return ''
def getPlayCount(self):
"""A stub implementation for the xbmc VideoInfoTag class getPlayCount() method"""
return 0
def getRating(self):
"""A stub implementation for the xbmc VideoInfoTag class getRating() method"""
return 0
def executeJSONRPC(jsonrpccommand):
"""A reimplementation of the xbmc executeJSONRPC() function"""
assert isinstance(jsonrpccommand, basestring)
command = json.loads(jsonrpccommand)
# Handle a list of commands sequentially
if isinstance(command, list):
ret = []
for action in command:
ret.append(executeJSONRPC(json.dumps(action)))
return json.dumps(ret)
ret = dict(id=command.get('id'), jsonrpc='2.0', result='OK')
if command.get('method').startswith('Input'):
pass
elif command.get('method') == 'Player.Open':
pass
elif command.get('method') == 'Settings.GetSettingValue':
key = command.get('params').get('setting')
ret.update(result=dict(value=settings.get(key)))
elif command.get('method') == 'Addons.GetAddonDetails':
if command.get('params', {}).get('addonid') == 'script.module.inputstreamhelper':
ret.update(result=dict(addon=dict(enabled='true', version='0.3.5')))
else:
ret.update(result=dict(addon=dict(enabled='true', version='1.2.3')))
elif command.get('method') == 'Textures.GetTextures':
ret.update(result=dict(textures=[dict(cachedurl="", imagehash="", lasthashcheck="", textureid=4837, url="")]))
elif command.get('method') == 'Textures.RemoveTexture':
pass
elif command.get('method') == 'JSONRPC.NotifyAll':
# Send a notification to all instances of subclasses
for sub in Monitor.__subclasses__():
for obj in sub.getinstances():
obj.onNotification(
sender=command.get('params').get('sender'),
method=command.get('params').get('message'),
data=json.dumps(command.get('params').get('data')),
)
else:
log("executeJSONRPC does not implement method '{method}'".format(**command), LOGERROR)
return json.dumps(dict(error=dict(code=-1, message='Not implemented'), id=command.get('id'), jsonrpc='2.0'))
return json.dumps(ret)
def getCondVisibility(string):
"""A reimplementation of the xbmc getCondVisibility() function"""
assert isinstance(string, basestring)
if string == 'system.platform.android':
return False
if string.startswith('System.HasAddon'):
return True
return True
def getInfoLabel(key):
"""A reimplementation of the xbmc getInfoLabel() function"""
assert isinstance(key, basestring)
return INFO_LABELS.get(key)
def getLocalizedString(msgctxt):
"""A reimplementation of the xbmc getLocalizedString() function"""
assert isinstance(msgctxt, int)
for entry in LANGUAGE:
if entry.msgctxt == '#%s' % msgctxt:
return entry.msgstr or entry.msgid
if int(msgctxt) >= 30000:
log('Unable to translate #{msgctxt}'.format(msgctxt=msgctxt), LOGERROR)
return '<Untranslated>'
def getRegion(key):
"""A reimplementation of the xbmc getRegion() function"""
assert isinstance(key, basestring)
return REGIONS.get(key)
def log(msg, level=0):
"""A reimplementation of the xbmc log() function"""
assert isinstance(msg, basestring)
assert isinstance(level, int)
color1 = '\033[32;1m'
color2 = '\033[32;0m'
name = LOGLEVELS[level]
if level in (4, 5, 6, 7):
color1 = '\033[31;1m'
if level in (6, 7):
raise Exception(msg)
elif level in (2, 3):
color1 = '\033[33;1m'
elif level == 0:
color2 = '\033[30;1m'
print('{color1}{name}: {color2}{msg}\033[39;0m'.format(name=name, color1=color1, color2=color2, msg=to_unicode(msg)))
def sleep(timemillis):
"""A reimplementation of the xbmc sleep() function"""
assert isinstance(timemillis, int)
time.sleep(timemillis / 1000)
def translatePath(path):
"""A stub implementation of the xbmc translatePath() function"""
assert isinstance(path, basestring)
if path.startswith('special:https://home'):
return path.replace('special:https://home', os.path.join(os.getcwd(), 'tests/'))
if path.startswith('special:https://masterprofile'):
return path.replace('special:https://masterprofile', os.path.join(os.getcwd(), 'tests/userdata/'))
if path.startswith('special:https://profile'):
return path.replace('special:https://profile', os.path.join(os.getcwd(), 'tests/userdata/'))
if path.startswith('special:https://userdata'):
return path.replace('special:https://userdata', os.path.join(os.getcwd(), 'tests/userdata/'))
return path