source: yum/trunk/test/testbase.py

Last change on this file was 516, checked in by Yuri Dario, 11 years ago

yum: update trunk to 3.4.3.

  • Property svn:eol-style set to native
File size: 17.9 KB
Line 
1import os
2import sys
3import unittest
4
5import settestpath
6import logging
7import yum.logginglevels as logginglevels
8
9new_behavior = "NEW_BEHAVIOR" in os.environ.keys()
10
11from yum import YumBase
12from yum import transactioninfo
13from yum import packages
14from yum import packageSack
15from yum.constants import TS_INSTALL_STATES, TS_REMOVE_STATES
16from cli import YumBaseCli
17from yum.rpmsack import RPMDBPackageSack as _rpmdbsack
18import inspect
19from rpmUtils import arch
20from rpmUtils.transaction import initReadOnlyTransaction
21import rpmUtils.miscutils
22
23
24#############################################################
25### Helper classes ##########################################
26#############################################################
27
28# Dummy translation wrapper
29def _(msg):
30 return msg
31
32# dummy save_ts to avoid lots of errors
33def save_ts(*args, **kwargs):
34 pass
35
36class FakeConf(object):
37
38 def __init__(self):
39 self.installonlypkgs = ['kernel']
40 self.exclude = []
41 self.debuglevel = 8
42 self.obsoletes = True
43 self.exactarch = False
44 self.exactarchlist = []
45 self.installroot = '/'
46 self.tsflags = []
47 self.installonly_limit = 0
48 self.skip_broken = False
49 self.disable_excludes = []
50 self.multilib_policy = 'best'
51 self.persistdir = '/should-not-exist-bad-test!'
52 self.showdupesfromrepos = False
53 self.uid = 0
54 self.groupremove_leaf_only = False
55 self.protected_packages = []
56 self.protected_multilib = False
57 self.clean_requirements_on_remove = True
58
59class FakeSack:
60 """ Fake PackageSack to use with FakeRepository"""
61 def __init__(self):
62 pass # This is fake, so do nothing
63
64 def have_fastReturnFileEntries(self):
65 return True
66
67class FakeRepo(object):
68
69 __fake_sack = FakeSack()
70 def __init__(self, id=None,sack=None):
71 self.id = id
72 if sack is None:
73 sack = self.__fake_sack
74 self.sack = sack
75 self.cost = 1000
76
77 def __cmp__(self, other):
78 """ Sort base class repos. by alphanumeric on their id, also
79 see __cmp__ in YumRepository(). """
80 if self.id > other.id:
81 return 1
82 elif self.id < other.id:
83 return -1
84 else:
85 return 0
86
87class FakeYumDBInfo(object):
88 """Simulate some functionality of RPMAdditionalDataPackage"""
89 _auto_hardlink_attrs = set(['checksum_type', 'reason',
90 'installed_by', 'changed_by',
91 'from_repo', 'from_repo_revision',
92 'from_repo_timestamp', 'releasever',
93 'command_line'])
94
95 def __init__(self, conf=None, pkgdir=None, yumdb_cache=None):
96 self.db = {}
97 for attr in self._auto_hardlink_attrs:
98 self.db[attr] = ''
99
100 def __getattr__(self, attr):
101 return self.db[attr]
102
103 def __setattr__(self, attr, value):
104 if not attr.startswith("db"):
105 self.db[attr] = value
106 else:
107 object.__setattr__(self, attr, value)
108
109 def __iter__(self, show_hidden=False):
110 for item in self.db:
111 yield item
112
113 def get(self, attr, default=None):
114 try:
115 res = self.db[attr]
116 except AttributeError:
117 return default
118 return res
119
120class FakePackage(packages.YumAvailablePackage):
121
122 def __init__(self, name, version='1.0', release='1', epoch='0', arch='noarch', repo=None):
123 if repo is None:
124 repo = FakeRepo()
125 print "creating empty repo for %s-%s:%s-%s.%s " % (name, epoch,
126 version, release,
127 arch)
128 packages.YumAvailablePackage.__init__(self, repo)
129
130 self.name = name
131 self.version = version
132 self.ver = version
133 self.release = release
134 self.rel = release
135 self.epoch = epoch
136 self.arch = arch
137 self.pkgtup = (self.name, self.arch, self.epoch, self.version, self.release)
138 self.yumdb_info = FakeYumDBInfo()
139
140 self.prco['provides'].append((name, 'EQ', (epoch, version, release)))
141
142 # Just a unique integer
143 self.id = self.__hash__()
144 self.pkgKey = self.__hash__()
145
146 self.required_pkgs = []
147 self.requiring_pkgs = []
148 def addProvides(self, name, flag=None, evr=(None, None, None)):
149 self.prco['provides'].append((name, flag, evr))
150 def addRequires(self, name, flag=None, evr=(None, None, None)):
151 self.prco['requires'].append((name, flag, evr))
152 def addRequiresPkg(self, pkg):
153 self.required_pkgs.append(pkg)
154 def addRequiringPkg(self, pkg):
155 self.requiring_pkgs.append(pkg)
156 def addConflicts(self, name, flag=None, evr=(None, None, None)):
157 self.prco['conflicts'].append((name, flag, evr))
158 def addObsoletes(self, name, flag=None, evr=(None, None, None)):
159 self.prco['obsoletes'].append((name, flag, evr))
160 def addFile(self, name, ftype='file'):
161 self.files[ftype].append(name)
162 def required_packages(self):
163 return self.required_pkgs
164 def requiring_packages(self):
165 return self.requiring_pkgs
166
167class _Container(object):
168 pass
169
170
171class DepSolveProgressCallBack:
172 """provides text output callback functions for Dependency Solver callback"""
173
174 def __init__(self):
175 """requires yum-cli log and errorlog functions as arguments"""
176 self.verbose_logger = logging.getLogger("yum.verbose.cli")
177 self.loops = 0
178
179 def pkgAdded(self, pkgtup, mode):
180 modedict = { 'i': _('installed'),
181 'u': _('an update'),
182 'e': _('erased'),
183 'r': _('reinstalled'),
184 'd': _('a downgrade'),
185 'o': _('obsoleting'),
186 'ud': _('updated'),
187 'od': _('obsoleted'),}
188 (n, a, e, v, r) = pkgtup
189 modeterm = modedict[mode]
190 self.verbose_logger.log(logginglevels.INFO_2,
191 _('---> Package %s.%s %s:%s-%s will be %s'), n, a, e, v, r,
192 modeterm)
193
194 def start(self):
195 self.loops += 1
196
197 def tscheck(self):
198 self.verbose_logger.log(logginglevels.INFO_2, _('--> Running transaction check'))
199
200 def restartLoop(self):
201 self.loops += 1
202 self.verbose_logger.log(logginglevels.INFO_2,
203 _('--> Restarting Dependency Resolution with new changes.'))
204 self.verbose_logger.debug('---> Loop Number: %d', self.loops)
205
206 def end(self):
207 self.verbose_logger.log(logginglevels.INFO_2,
208 _('--> Finished Dependency Resolution'))
209
210
211 def procReq(self, name, formatted_req):
212 self.verbose_logger.log(logginglevels.INFO_2,
213 _('--> Processing Dependency: %s for package: %s'), formatted_req,
214 name)
215
216
217 def unresolved(self, msg):
218 self.verbose_logger.log(logginglevels.INFO_2, _('--> Unresolved Dependency: %s'),
219 msg)
220
221
222 def procConflict(self, name, confname):
223 self.verbose_logger.log(logginglevels.INFO_2,
224 _('--> Processing Conflict: %s conflicts %s'), name, confname)
225
226 def transactionPopulation(self):
227 self.verbose_logger.log(logginglevels.INFO_2, _('--> Populating transaction set '
228 'with selected packages. Please wait.'))
229
230 def downloadHeader(self, name):
231 self.verbose_logger.log(logginglevels.INFO_2, _('---> Downloading header for %s '
232 'to pack into transaction set.'), name)
233
234#######################################################################
235### Abstract super class for test cases ###############################
236#######################################################################
237
238class _DepsolveTestsBase(unittest.TestCase):
239
240 res = {0 : 'empty', 2 : 'ok', 1 : 'err'}
241
242 def __init__(self, methodName='runTest'):
243 unittest.TestCase.__init__(self, methodName)
244 self.pkgs = _Container()
245 self.buildPkgs(self.pkgs)
246
247 def setUp(self):
248 pass
249 def tearDown(self):
250 pass
251
252 @staticmethod
253 def buildPkgs(pkgs, *args):
254 """Overload this staticmethod to create pkpgs that are used in several
255 test cases. It gets called from __init__ with self.pkgs as first parameter.
256 It is a staticmethod so you can call .buildPkgs() from other Tests to share
257 buildPkg code (inheritance doesn't work here, because we don't want to
258 inherit the test cases, too).
259 """
260 pass
261
262 def assertResult(self, pkgs, optional_pkgs=[]):
263 """Check if "system" contains the given pkgs. pkgs must be present,
264 optional_pkgs may be. Any other pkgs result in an error. Pkgs are
265 present if they are in the rpmdb and are not REMOVEd or they
266 are INSTALLed.
267 """
268 errors = ["Unexpected result after depsolving: \n\n"]
269 pkgs = set(pkgs)
270 optional_pkgs = set(optional_pkgs)
271 installed = set()
272
273 for pkg in self.rpmdb:
274 # got removed
275 if self.tsInfo.getMembersWithState(pkg.pkgtup, TS_REMOVE_STATES):
276 if pkg in pkgs:
277 errors.append("Package %s was removed!\n" % pkg)
278 else: # still installed
279 if pkg not in pkgs and pkg not in optional_pkgs:
280 errors.append("Package %s was not removed!\n" % pkg)
281 installed.add(pkg)
282
283 for txmbr in self.tsInfo.getMembersWithState(output_states=TS_INSTALL_STATES):
284 installed.add(txmbr.po)
285 if txmbr.po not in pkgs and txmbr.po not in optional_pkgs:
286 errors.append("Package %s was installed!\n" % txmbr.po)
287 for pkg in pkgs - installed:
288 errors.append("Package %s was not installed!\n" % pkg)
289
290 if len(errors) > 1:
291 errors.append("\nTest case was:\n\n")
292 errors.extend(inspect.getsource(inspect.stack()[1][0].f_code))
293 errors.append("\n")
294 self.fail("".join(errors))
295
296class FakeRpmDb(packageSack.PackageSack):
297 '''
298 We use a PackagePack for a Fake rpmdb insted of the normal
299 RPMDBPackageSack, getProvides works a little different on
300 unversioned requirements so we have to overload an add some
301 extra checkcode.
302 '''
303 def __init__(self):
304 packageSack.PackageSack.__init__(self)
305
306 # Need to mock out rpmdb caching... copy&paste. Gack.
307 def returnConflictPackages(self):
308 ret = []
309 for pkg in self.returnPackages():
310 if len(pkg.conflicts):
311 ret.append(pkg)
312 return ret
313 def fileRequiresData(self):
314 installedFileRequires = {}
315 installedUnresolvedFileRequires = set()
316 resolved = set()
317 for pkg in self.returnPackages():
318 for name, flag, evr in pkg.requires:
319 if not name.startswith('/'):
320 continue
321 installedFileRequires.setdefault(pkg.pkgtup, []).append(name)
322 if name not in resolved:
323 dep = self.getProvides(name, flag, evr)
324 resolved.add(name)
325 if not dep:
326 installedUnresolvedFileRequires.add(name)
327
328 fileRequires = set()
329 for fnames in installedFileRequires.itervalues():
330 fileRequires.update(fnames)
331 installedFileProviders = {}
332 for fname in fileRequires:
333 pkgtups = [pkg.pkgtup for pkg in self.getProvides(fname)]
334 installedFileProviders[fname] = pkgtups
335
336 ret = (installedFileRequires, installedUnresolvedFileRequires,
337 installedFileProviders)
338
339 return ret
340 def transactionCacheFileRequires(self, installedFileRequires,
341 installedUnresolvedFileRequires,
342 installedFileProvides,
343 problems):
344 return
345 def transactionCacheConflictPackages(self, pkgs):
346 return
347 def transactionResultVersion(self, rpmdbv):
348 return
349 def transactionReset(self):
350 return
351
352 def readOnlyTS(self):
353 # Should probably be able to "fake" this, so we can provide different
354 # get_running_kernel_pkgtup(). Bah.
355 return initReadOnlyTransaction("/")
356
357 def getProvides(self, name, flags=None, version=(None, None, None)):
358 """return dict { packages -> list of matching provides }"""
359 self._checkIndexes(failure='build')
360 result = { }
361 # convert flags & version for unversioned reqirements
362 if not version:
363 version=(None, None, None)
364 if type(version) in (str, type(None), unicode):
365 version = rpmUtils.miscutils.stringToVersion(version)
366 if flags == '0':
367 flags=None
368 for po in self.provides.get(name, []):
369 hits = po.matchingPrcos('provides', (name, flags, version))
370 if hits:
371 result[po] = hits
372 if name[0] == '/':
373 hit = (name, None, (None, None, None))
374 for po in self.searchFiles(name):
375 result.setdefault(po, []).append(hit)
376 return result
377
378
379#######################################################################
380### Derive Tests from these classes or unittest.TestCase ##############
381#######################################################################
382
383class DepsolveTests(_DepsolveTestsBase):
384 """Run depsolver on an manually set up transaction.
385 You can add pkgs to self.rpmdb or self.tsInfo. See
386 yum/transactioninfo.py for details.
387 A typical test case looks like:
388
389 def testInstallPackageRequireInstalled(self):
390 po = FakePackage('zsh', '1', '1', None, 'i386')
391 po.addRequires('zip', 'EQ', (None, '1.3', '2'))
392 self.tsInfo.addInstall(po)
393
394 ipo = FakePackage('zip', '1.3', '2', None, 'i386')
395 self.rpmdb.addPackage(ipo)
396
397 result, msg = self.resolveCode()
398 self.assertEquals('ok', result, msg)
399 self.assertResult((po, ipo))
400 """
401
402 def setUp(self):
403 """ Called at the start of each test. """
404 _DepsolveTestsBase.setUp(self)
405 self.tsInfo = transactioninfo.TransactionData()
406 self.tsInfo.debug = 1
407 self.rpmdb = FakeRpmDb()
408 self.xsack = packageSack.PackageSack()
409 self.repo = FakeRepo("installed")
410 # XXX this side-affect is hacky:
411 self.tsInfo.setDatabases(self.rpmdb, self.xsack)
412
413 def resetTsInfo(self):
414 self.tsInfo = transactioninfo.TransactionData()
415
416 def resolveCode(self):
417 solver = YumBase()
418 solver.save_ts = save_ts
419 solver.conf = FakeConf()
420 solver.arch.setup_arch('x86_64')
421 solver.tsInfo = solver._tsInfo = self.tsInfo
422 solver.rpmdb = self.rpmdb
423 solver.pkgSack = self.xsack
424
425 for po in self.rpmdb:
426 po.repoid = po.repo.id = "installed"
427 for po in self.xsack:
428 if po.repo.id is None:
429 po.repo.id = "TestRepository"
430 po.repoid = po.repo.id
431 for txmbr in self.tsInfo:
432 if txmbr.ts_state in ('u', 'i'):
433 if txmbr.po.repo.id is None:
434 txmbr.po.repo.id = "TestRepository"
435 txmbr.po.repoid = txmbr.po.repo.id
436 else:
437 txmbr.po.repoid = txmbr.po.repo.id = "installed"
438
439 result, msg = solver.resolveDeps()
440 return (self.res[result], msg)
441
442class OperationsTests(_DepsolveTestsBase):
443 """Run a yum command (install, update, remove, ...) in a given set of installed
444 and available pkgs. Typical test case looks like:
445
446 def testUpdate(self):
447 p = self.pkgs
448 res, msg = self.runOperation(['update'], [p.installed], [p.update])
449 self.assert_(res=='ok', msg)
450 self.assertResult((p.update,))
451
452 To avoid creating the same pkgs over and over again overload the staticmethod
453 buildPkgs. It gets called from __init__ with self.pkgs as first parameter.
454 As it is a static method you can call .buildPkgs() from other Tests to share
455 buildPkg code.
456 """
457
458 def runOperation(self, args, installed=[], available=[],
459 confs={}, multi_cmds=False):
460 """Sets up and runs the depsolver. args[0] must be a valid yum command
461 ("install", "update", ...). It might be followed by pkg names as on the
462 yum command line. The pkg objects in installed are added to self.rpmdb and
463 those in available to self.xsack which is the repository to resolve
464 requirements from.
465 """
466 depsolver = YumBaseCli()
467 depsolver.save_ts = save_ts
468 depsolver.arch.setup_arch('x86_64')
469 self.rpmdb = depsolver.rpmdb = FakeRpmDb()
470 self.xsack = depsolver._pkgSack = packageSack.PackageSack()
471 self.repo = depsolver.repo = FakeRepo("installed")
472 depsolver.conf = FakeConf()
473 for conf in confs:
474 setattr(depsolver.conf, conf, confs[conf])
475 # We are running nosetest, so we want to see some yum output
476 # if a testcase if failing
477 depsolver.doLoggingSetup(9,9)
478 self.depsolver = depsolver
479
480 for po in installed:
481 po.repoid = po.repo.id = "installed"
482 self.depsolver.rpmdb.addPackage(po)
483 for po in available:
484 if po.repo.id is None:
485 po.repo.id = "TestRepository"
486 po.repoid = po.repo.id
487 self.depsolver._pkgSack.addPackage(po)
488
489 if not multi_cmds:
490 self.depsolver.basecmd = args[0]
491 self.depsolver.extcmds = args[1:]
492 res, msg = self.depsolver.doCommands()
493 else:
494 for nargs in args:
495 self.depsolver.basecmd = nargs[0]
496 self.depsolver.extcmds = nargs[1:]
497 res, msg = self.depsolver.doCommands()
498 if res != 2:
499 return res, msg
500
501 self.tsInfo = depsolver.tsInfo
502 if res!=2:
503 return res, msg
504 res, msg = self.depsolver.buildTransaction()
505 return self.res[res], msg
Note: See TracBrowser for help on using the repository browser.