testcase.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # Test class and utilities for functional tests
  2. #
  3. # Copyright 2018, 2024 Red Hat, Inc.
  4. #
  5. # Original Author (Avocado-based tests):
  6. # Cleber Rosa <crosa@redhat.com>
  7. #
  8. # Adaption for standalone version:
  9. # Thomas Huth <thuth@redhat.com>
  10. #
  11. # This work is licensed under the terms of the GNU GPL, version 2 or
  12. # later. See the COPYING file in the top-level directory.
  13. import logging
  14. import os
  15. import pycotap
  16. import sys
  17. import unittest
  18. import uuid
  19. from qemu.machine import QEMUMachine
  20. from qemu.utils import kvm_available, tcg_available
  21. from .asset import Asset
  22. from .cmd import run_cmd
  23. from .config import BUILD_DIR
  24. class QemuBaseTest(unittest.TestCase):
  25. qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
  26. arch = None
  27. workdir = None
  28. log = None
  29. logdir = None
  30. def setUp(self, bin_prefix):
  31. self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
  32. self.arch = self.qemu_bin.split('-')[-1]
  33. self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
  34. self.id())
  35. os.makedirs(self.workdir, exist_ok=True)
  36. self.logdir = self.workdir
  37. self.log = logging.getLogger('qemu-test')
  38. self.log.setLevel(logging.DEBUG)
  39. self._log_fh = logging.FileHandler(os.path.join(self.logdir,
  40. 'base.log'), mode='w')
  41. self._log_fh.setLevel(logging.DEBUG)
  42. fileFormatter = logging.Formatter(
  43. '%(asctime)s - %(levelname)s: %(message)s')
  44. self._log_fh.setFormatter(fileFormatter)
  45. self.log.addHandler(self._log_fh)
  46. def tearDown(self):
  47. self.log.removeHandler(self._log_fh)
  48. def main():
  49. path = os.path.basename(sys.argv[0])[:-3]
  50. cache = os.environ.get("QEMU_TEST_PRECACHE", None)
  51. if cache is not None:
  52. Asset.precache_suites(path, cache)
  53. return
  54. tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
  55. test_output_log = pycotap.LogMode.LogToError)
  56. unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
  57. class QemuSystemTest(QemuBaseTest):
  58. """Facilitates system emulation tests."""
  59. cpu = None
  60. machine = None
  61. _machinehelp = None
  62. def setUp(self):
  63. self._vms = {}
  64. super().setUp('qemu-system-')
  65. console_log = logging.getLogger('console')
  66. console_log.setLevel(logging.DEBUG)
  67. self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
  68. 'console.log'), mode='w')
  69. self._console_log_fh.setLevel(logging.DEBUG)
  70. fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
  71. self._console_log_fh.setFormatter(fileFormatter)
  72. console_log.addHandler(self._console_log_fh)
  73. def set_machine(self, machinename):
  74. # TODO: We should use QMP to get the list of available machines
  75. if not self._machinehelp:
  76. self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
  77. if self._machinehelp.find(machinename) < 0:
  78. self.skipTest('no support for machine ' + machinename)
  79. self.machine = machinename
  80. def require_accelerator(self, accelerator):
  81. """
  82. Requires an accelerator to be available for the test to continue
  83. It takes into account the currently set qemu binary.
  84. If the check fails, the test is canceled. If the check itself
  85. for the given accelerator is not available, the test is also
  86. canceled.
  87. :param accelerator: name of the accelerator, such as "kvm" or "tcg"
  88. :type accelerator: str
  89. """
  90. checker = {'tcg': tcg_available,
  91. 'kvm': kvm_available}.get(accelerator)
  92. if checker is None:
  93. self.skipTest("Don't know how to check for the presence "
  94. "of accelerator %s" % accelerator)
  95. if not checker(qemu_bin=self.qemu_bin):
  96. self.skipTest("%s accelerator does not seem to be "
  97. "available" % accelerator)
  98. def require_netdev(self, netdevname):
  99. netdevhelp = run_cmd([self.qemu_bin,
  100. '-M', 'none', '-netdev', 'help'])[0];
  101. if netdevhelp.find('\n' + netdevname + '\n') < 0:
  102. self.skipTest('no support for " + netdevname + " networking')
  103. def require_device(self, devicename):
  104. devhelp = run_cmd([self.qemu_bin,
  105. '-M', 'none', '-device', 'help'])[0];
  106. if devhelp.find(devicename) < 0:
  107. self.skipTest('no support for device ' + devicename)
  108. def _new_vm(self, name, *args):
  109. vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
  110. self.log.debug('QEMUMachine "%s" created', name)
  111. self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
  112. self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
  113. if args:
  114. vm.add_args(*args)
  115. return vm
  116. @property
  117. def vm(self):
  118. return self.get_vm(name='default')
  119. def get_vm(self, *args, name=None):
  120. if not name:
  121. name = str(uuid.uuid4())
  122. if self._vms.get(name) is None:
  123. self._vms[name] = self._new_vm(name, *args)
  124. if self.cpu is not None:
  125. self._vms[name].add_args('-cpu', self.cpu)
  126. if self.machine is not None:
  127. self._vms[name].set_machine(self.machine)
  128. return self._vms[name]
  129. def set_vm_arg(self, arg, value):
  130. """
  131. Set an argument to list of extra arguments to be given to the QEMU
  132. binary. If the argument already exists then its value is replaced.
  133. :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
  134. :type arg: str
  135. :param value: the argument value, such as "host" in "-cpu host"
  136. :type value: str
  137. """
  138. if not arg or not value:
  139. return
  140. if arg not in self.vm.args:
  141. self.vm.args.extend([arg, value])
  142. else:
  143. idx = self.vm.args.index(arg) + 1
  144. if idx < len(self.vm.args):
  145. self.vm.args[idx] = value
  146. else:
  147. self.vm.args.append(value)
  148. def tearDown(self):
  149. for vm in self._vms.values():
  150. vm.shutdown()
  151. logging.getLogger('console').removeHandler(self._console_log_fh)
  152. super().tearDown()