/
oldUtils.py
351 lines (305 loc) · 10.9 KB
/
oldUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
#!/usr/bin/python -tt
import rpm
import types
import os
import gzip
import sys
from gzip import write32u, FNAME
from urlgrabber.grabber import URLGrabError
from zlib import error as zlibError
def log(num, msg):
print >>sys.stderr, msg
errorlog = log
def _(msg):
return msg
# pylint: disable-msg=E0602
def checkheader(headerfile, name, arch):
"""check a header by opening it and comparing the results to the name and arch
we believe it to be for. if it fails raise URLGrabError(-1)"""
h = Header_Work(headerfile)
fail = 0
if h.hdr is None:
fail = 1
else:
if name != h.name() or arch != h.arch():
fail = 1
if fail:
raise URLGrabError(-1, _('Header cannot be opened or does not match %s, %s.') % (name, arch))
return
def checkRpmMD5(package, urlgraberror=0):
"""take a package, check it out by trying to open it, return 1 if it's good
return 0 if it's not"""
ts.sigChecking('md5')
fdno = os.open(package, os.O_RDONLY)
try:
ts.hdrFromFdno(fdno)
except rpm.error, e:
good = 0
else:
good = 1
os.close(fdno)
ts.sigChecking('default')
if urlgraberror:
if not good:
raise URLGrabError(-1, _('RPM %s fails md5 check') % (package))
else:
return
else:
return good
def checkSig(package):
""" take a package, check it's sigs, return 0 if they are all fine, return
1 if the gpg key can't be found, 2 if the header is in someway damaged,
3 if the key is not trusted, 4 if the pkg is not gpg or pgp signed"""
ts.sigChecking('default')
fdno = os.open(package, os.O_RDONLY)
try:
hdr = ts.hdrFromFdno(fdno)
except rpm.error, e:
if str(e) == "public key not availaiable":
return 1
if str(e) == "public key not available":
return 1
if str(e) == "public key not trusted":
return 3
if str(e) == "error reading package header":
return 2
else:
error, siginfo = getSigInfo(hdr)
if error == 101:
os.close(fdno)
del hdr
return 4
else:
del hdr
os.close(fdno)
return 0
def getSigInfo(hdr):
"""checks if a computerhand back signature information and an error code"""
string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
siginfo = hdr.sprintf(string)
if siginfo != '(none)':
error = 0
sigtype, sigdate, sigid = siginfo.split(',')
else:
error = 101
sigtype = 'MD5'
sigdate = 'None'
sigid = 'None'
infotuple = (sigtype, sigdate, sigid)
return error, infotuple
def getProvides(header):
provnames = []
provides = header[rpm.RPMTAG_PROVIDENAME]
if provides is None:
pass
elif type(provides) is types.ListType:
provnames.extend(provides)
else:
provnames.append(provides)
return provnames
def compareEVR((e1, v1, r1), (e2, v2, r2)):
# return 1: a is newer than b
# 0: a and b are the same version
# -1: b is newer than a
def rpmOutToStr(arg):
if type(arg) != types.StringType and arg != None:
arg = str(arg)
return arg
e1 = rpmOutToStr(e1)
v1 = rpmOutToStr(v1)
r1 = rpmOutToStr(r1)
e2 = rpmOutToStr(e2)
v2 = rpmOutToStr(v2)
r2 = rpmOutToStr(r2)
rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
log(6, '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc))
return rc
def formatRequire (name, version, flags):
if flags:
if flags & (rpm.RPMSENSE_LESS | rpm.RPMSENSE_GREATER | rpm.RPMSENSE_EQUAL):
name = name + ' '
if flags & rpm.RPMSENSE_LESS:
name = name + '<'
if flags & rpm.RPMSENSE_GREATER:
name = name + '>'
if flags & rpm.RPMSENSE_EQUAL:
name = name + '='
name = name + ' %s' % version
return name
def openrpmdb():
try:
db = rpm.TransactionSet(conf.installroot)
except rpm.error, e:
errorlog(0, _("Could not open RPM database for reading. Perhaps it is already in use?"))
return db
# this is done to make the hdr writing _more_ sane for rsync users especially
__all__ = ["GzipFile","open"]
class GzipFile(gzip.GzipFile):
def _write_gzip_header(self):
self.fileobj.write('\037\213') # magic header
self.fileobj.write('\010') # compression method
fname = self.filename[:-3]
flags = 0
if fname:
flags = FNAME
self.fileobj.write(chr(flags))
write32u(self.fileobj, long(0))
self.fileobj.write('\002')
self.fileobj.write('\377')
if fname:
self.fileobj.write(fname + '\000')
def _gzipOpen(filename, mode="rb", compresslevel=9):
return GzipFile(filename, mode, compresslevel)
class RPM_Base_Work:
def __init__(self):
self.hdr = None
def _getTag(self, tag):
if self.hdr is None:
errorlog(0, _('Got an empty Header, something has gone wrong'))
#FIXME should raise a yum error here
sys.exit(1)
return self.hdr[tag]
def isSource(self):
if self._getTag('sourcepackage') == 1:
return 1
else:
return 0
def name(self):
return self._getTag('name')
def arch(self):
return self._getTag('arch')
def epoch(self):
return self._getTag('epoch')
def version(self):
return self._getTag('version')
def release(self):
return self._getTag('release')
def evr(self):
e = self._getTag('epoch')
v = self._getTag('version')
r = self._getTag('release')
return (e, v, r)
def nevra(self):
n = self._getTag('name')
e = self._getTag('epoch')
v = self._getTag('version')
r = self._getTag('release')
a = self._getTag('arch')
return (n, e, v, r, a)
def writeHeader(self, headerdir, compress):
# write the header out to a file with the format: name-epoch-ver-rel.arch.hdr
# return the name of the file it just made - no real reason :)
(name, epoch, ver, rel, arch) = self.nevra()
if epoch is None:
epoch = '0'
if self.isSource():
headerfn = "%s/%s-%s-%s-%s.src.hdr" % (headerdir, name, epoch, ver, rel)
else:
headerfn = "%s/%s-%s-%s-%s.%s.hdr" % (headerdir, name, epoch, ver, rel, arch)
if compress:
headerout = _gzipOpen(headerfn, "w")
else:
headerout = open(headerfn, "w")
headerout.write(self.hdr.unload(1))
headerout.close()
return(headerfn)
class Header_Work(RPM_Base_Work):
"""for operating on hdrs in and out of the rpmdb
if the first arg is a string then it's a filename
otherwise it's an rpm hdr"""
def __init__(self, header):
if type(header) is types.StringType:
try:
fd = gzip.open(header, 'r')
try:
h = rpm.headerLoad(fd.read())
except rpm.error, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except IOError,e:
fd = open(header, 'r')
try:
h = rpm.headerLoad(fd.read())
except rpm.error, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except ValueError, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
except zlibError, e:
errorlog(0,_('Damaged Header %s') % header)
h = None
fd.close()
else:
h = header
self.hdr = h
class RPM_Work(RPM_Base_Work):
def __init__(self, rpmfn):
ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
fd = os.open(rpmfn, os.O_RDONLY)
try:
self.hdr = ts.hdrFromFdno(fd)
except rpm.error, e:
errorlog(0, _('Error opening rpm %s - error %s') % (rpmfn, e))
self.hdr = None
os.close(fd)
class Rpm_Ts_Work:
"""This should operate on groups of headers/matches/etc in the rpmdb - ideally it will
operate with a list of the Base objects above, so I can refer to any one object there
not sure the best way to do this yet, more thinking involved"""
def __init__(self, dbPath='/'):
try:
if conf.installroot:
if conf.installroot != '/':
dbPath = conf.installroot
except NameError, e:
pass
self.ts = rpm.TransactionSet(dbPath)
self.methods = ['addInstall', 'addErase', 'run', 'check', 'order', 'hdrFromFdno',
'closeDB', 'dbMatch', 'setFlags', 'setVSFlags', 'setProbFilter']
def __getattr__(self, attribute):
if attribute in self.methods:
return getattr(self.ts, attribute)
else:
raise AttributeError, attribute
def match(self, tag = None, search = None, mire = None):
"""hands back a list of Header_Work objects"""
hwlist = []
# hand back the whole list of hdrs
if mire is None and tag is None and search is None:
hdrlist = self.ts.dbMatch()
else:
#just do a non-mire'd search
if mire == None:
hdrlist = self.ts.dbMatch(tag, search)
else:
# mire search
if mire == 'glob':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_GLOB, search)
elif mire == 'regex':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_REGEX, search)
elif mire == 'strcmp':
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_STRCMP, search)
else:
hdrlist = self.ts.dbMatch()
hdrlist.pattern(tag, rpm.RPMMIRE_DEFAULT, search)
for hdr in hdrlist:
hdrobj = Header_Work(hdr)
hwlist.append(hdrobj)
return hwlist
def sigChecking(self, sig):
"""pass type of check you want to occur, default is to have them off"""
if sig == 'md5':
#turn off everything but md5 - and we need to the check the payload
self.ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
elif sig == 'none':
# turn off everything - period
self.ts.setVSFlags(~(rpm._RPMVSF_NOSIGNATURES))
elif sig == 'default':
# set it back to the default
self.ts.setVSFlags(rpm.RPMVSF_DEFAULT)
else:
raise AttributeError, sig