123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- # Test class and utilities for functional tests
- #
- # Copyright 2018, 2024 Red Hat, Inc.
- #
- # Original Author (Avocado-based tests):
- # Cleber Rosa <crosa@redhat.com>
- #
- # Adaption for standalone version:
- # Thomas Huth <thuth@redhat.com>
- #
- # This work is licensed under the terms of the GNU GPL, version 2 or
- # later. See the COPYING file in the top-level directory.
- import logging
- import os
- from pathlib import Path
- import pycotap
- import shutil
- from subprocess import run
- import sys
- import tempfile
- import unittest
- import uuid
- from qemu.machine import QEMUMachine
- from qemu.utils import kvm_available, tcg_available
- from .archive import archive_extract
- from .asset import Asset
- from .config import BUILD_DIR, dso_suffix
- from .uncompress import uncompress
- class QemuBaseTest(unittest.TestCase):
- arch = None
- workdir = None
- log = None
- logdir = None
- '''
- @params compressed: filename, Asset, or file-like object to uncompress
- @params format: optional compression format (gzip, lzma)
- Uncompresses @compressed into the scratch directory.
- If @format is None, heuristics will be applied to guess the format
- from the filename or Asset URL. @format must be non-None if @uncompressed
- is a file-like object.
- Returns the fully qualified path to the uncompressed file
- '''
- def uncompress(self, compressed, format=None):
- self.log.debug(f"Uncompress {compressed} format={format}")
- if type(compressed) == Asset:
- compressed.fetch()
- (name, ext) = os.path.splitext(str(compressed))
- uncompressed = self.scratch_file(os.path.basename(name))
- uncompress(compressed, uncompressed, format)
- return uncompressed
- '''
- @params archive: filename, Asset, or file-like object to extract
- @params format: optional archive format (tar, zip, deb, cpio)
- @params sub_dir: optional sub-directory to extract into
- @params member: optional member file to limit extraction to
- Extracts @archive into the scratch directory, or a directory beneath
- named by @sub_dir. All files are extracted unless @member specifies
- a limit.
- If @format is None, heuristics will be applied to guess the format
- from the filename or Asset URL. @format must be non-None if @archive
- is a file-like object.
- If @member is non-None, returns the fully qualified path to @member
- '''
- def archive_extract(self, archive, format=None, sub_dir=None, member=None):
- self.log.debug(f"Extract {archive} format={format}" +
- f"sub_dir={sub_dir} member={member}")
- if type(archive) == Asset:
- archive.fetch()
- if sub_dir is None:
- archive_extract(archive, self.scratch_file(), format, member)
- else:
- archive_extract(archive, self.scratch_file(sub_dir),
- format, member)
- if member is not None:
- return self.scratch_file(member)
- return None
- '''
- Create a temporary directory suitable for storing UNIX
- socket paths.
- Returns: a tempfile.TemporaryDirectory instance
- '''
- def socket_dir(self):
- if self.socketdir is None:
- self.socketdir = tempfile.TemporaryDirectory(
- prefix="qemu_func_test_sock_")
- return self.socketdir
- '''
- @params args list of zero or more subdirectories or file
- Construct a path for accessing a data file located
- relative to the source directory that is the root for
- functional tests.
- @args may be an empty list to reference the root dir
- itself, may be a single element to reference a file in
- the root directory, or may be multiple elements to
- reference a file nested below. The path components
- will be joined using the platform appropriate path
- separator.
- Returns: string representing a file path
- '''
- def data_file(self, *args):
- return str(Path(Path(__file__).parent.parent, *args))
- '''
- @params args list of zero or more subdirectories or file
- Construct a path for accessing a data file located
- relative to the build directory root.
- @args may be an empty list to reference the build dir
- itself, may be a single element to reference a file in
- the build directory, or may be multiple elements to
- reference a file nested below. The path components
- will be joined using the platform appropriate path
- separator.
- Returns: string representing a file path
- '''
- def build_file(self, *args):
- return str(Path(BUILD_DIR, *args))
- '''
- @params args list of zero or more subdirectories or file
- Construct a path for accessing/creating a scratch file
- located relative to a temporary directory dedicated to
- this test case. The directory and its contents will be
- purged upon completion of the test.
- @args may be an empty list to reference the scratch dir
- itself, may be a single element to reference a file in
- the scratch directory, or may be multiple elements to
- reference a file nested below. The path components
- will be joined using the platform appropriate path
- separator.
- Returns: string representing a file path
- '''
- def scratch_file(self, *args):
- return str(Path(self.workdir, *args))
- '''
- @params args list of zero or more subdirectories or file
- Construct a path for accessing/creating a log file
- located relative to a temporary directory dedicated to
- this test case. The directory and its log files will be
- preserved upon completion of the test.
- @args may be an empty list to reference the log dir
- itself, may be a single element to reference a file in
- the log directory, or may be multiple elements to
- reference a file nested below. The path components
- will be joined using the platform appropriate path
- separator.
- Returns: string representing a file path
- '''
- def log_file(self, *args):
- return str(Path(self.outputdir, *args))
- '''
- @params plugin name
- Return the full path to the plugin taking into account any host OS
- specific suffixes.
- '''
- def plugin_file(self, plugin_name):
- sfx = dso_suffix()
- return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
- def assets_available(self):
- for name, asset in vars(self.__class__).items():
- if name.startswith("ASSET_") and type(asset) == Asset:
- if not asset.available():
- self.log.debug(f"Asset {asset.url} not available")
- return False
- return True
- def setUp(self):
- self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
- self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
- self.arch = self.qemu_bin.split('-')[-1]
- self.socketdir = None
- self.outputdir = self.build_file('tests', 'functional',
- self.arch, self.id())
- self.workdir = os.path.join(self.outputdir, 'scratch')
- os.makedirs(self.workdir, exist_ok=True)
- self.log_filename = self.log_file('base.log')
- self.log = logging.getLogger('qemu-test')
- self.log.setLevel(logging.DEBUG)
- self._log_fh = logging.FileHandler(self.log_filename, mode='w')
- self._log_fh.setLevel(logging.DEBUG)
- fileFormatter = logging.Formatter(
- '%(asctime)s - %(levelname)s: %(message)s')
- self._log_fh.setFormatter(fileFormatter)
- self.log.addHandler(self._log_fh)
- # Capture QEMUMachine logging
- self.machinelog = logging.getLogger('qemu.machine')
- self.machinelog.setLevel(logging.DEBUG)
- self.machinelog.addHandler(self._log_fh)
- if not self.assets_available():
- self.skipTest('One or more assets is not available')
- def tearDown(self):
- if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
- shutil.rmtree(self.workdir)
- if self.socketdir is not None:
- shutil.rmtree(self.socketdir.name)
- self.socketdir = None
- self.machinelog.removeHandler(self._log_fh)
- self.log.removeHandler(self._log_fh)
- def main():
- path = os.path.basename(sys.argv[0])[:-3]
- cache = os.environ.get("QEMU_TEST_PRECACHE", None)
- if cache is not None:
- Asset.precache_suites(path, cache)
- return
- tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
- test_output_log = pycotap.LogMode.LogToError)
- res = unittest.main(module = None, testRunner = tr, exit = False,
- argv=["__dummy__", path])
- for (test, message) in res.result.errors + res.result.failures:
- if hasattr(test, "log_filename"):
- print('More information on ' + test.id() + ' could be found here:'
- '\n %s' % test.log_filename, file=sys.stderr)
- if hasattr(test, 'console_log_name'):
- print(' %s' % test.console_log_name, file=sys.stderr)
- sys.exit(not res.result.wasSuccessful())
- class QemuUserTest(QemuBaseTest):
- def setUp(self):
- super().setUp()
- self._ldpath = []
- def add_ldpath(self, ldpath):
- self._ldpath.append(os.path.abspath(ldpath))
- def run_cmd(self, bin_path, args=[]):
- return run([self.qemu_bin]
- + ["-L %s" % ldpath for ldpath in self._ldpath]
- + [bin_path]
- + args,
- text=True, capture_output=True)
- class QemuSystemTest(QemuBaseTest):
- """Facilitates system emulation tests."""
- cpu = None
- machine = None
- _machinehelp = None
- def setUp(self):
- self._vms = {}
- super().setUp()
- console_log = logging.getLogger('console')
- console_log.setLevel(logging.DEBUG)
- self.console_log_name = self.log_file('console.log')
- self._console_log_fh = logging.FileHandler(self.console_log_name,
- mode='w')
- self._console_log_fh.setLevel(logging.DEBUG)
- fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
- self._console_log_fh.setFormatter(fileFormatter)
- console_log.addHandler(self._console_log_fh)
- def set_machine(self, machinename):
- # TODO: We should use QMP to get the list of available machines
- if not self._machinehelp:
- self._machinehelp = run(
- [self.qemu_bin, '-M', 'help'],
- capture_output=True, check=True, encoding='utf8').stdout
- if self._machinehelp.find(machinename) < 0:
- self.skipTest('no support for machine ' + machinename)
- self.machine = machinename
- def require_accelerator(self, accelerator):
- """
- Requires an accelerator to be available for the test to continue
- It takes into account the currently set qemu binary.
- If the check fails, the test is canceled. If the check itself
- for the given accelerator is not available, the test is also
- canceled.
- :param accelerator: name of the accelerator, such as "kvm" or "tcg"
- :type accelerator: str
- """
- checker = {'tcg': tcg_available,
- 'kvm': kvm_available}.get(accelerator)
- if checker is None:
- self.skipTest("Don't know how to check for the presence "
- "of accelerator %s" % accelerator)
- if not checker(qemu_bin=self.qemu_bin):
- self.skipTest("%s accelerator does not seem to be "
- "available" % accelerator)
- def require_netdev(self, netdevname):
- help = run([self.qemu_bin,
- '-M', 'none', '-netdev', 'help'],
- capture_output=True, check=True, encoding='utf8').stdout;
- if help.find('\n' + netdevname + '\n') < 0:
- self.skipTest('no support for " + netdevname + " networking')
- def require_device(self, devicename):
- help = run([self.qemu_bin,
- '-M', 'none', '-device', 'help'],
- capture_output=True, check=True, encoding='utf8').stdout;
- if help.find(devicename) < 0:
- self.skipTest('no support for device ' + devicename)
- def _new_vm(self, name, *args):
- vm = QEMUMachine(self.qemu_bin,
- name=name,
- base_temp_dir=self.workdir,
- log_dir=self.log_file())
- self.log.debug('QEMUMachine "%s" created', name)
- self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
- sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
- if sockpath is not None:
- vm.add_args("-chardev",
- f"socket,id=backdoor,path={sockpath},server=on,wait=off",
- "-mon", "chardev=backdoor,mode=control")
- if args:
- vm.add_args(*args)
- return vm
- @property
- def vm(self):
- return self.get_vm(name='default')
- def get_vm(self, *args, name=None):
- if not name:
- name = str(uuid.uuid4())
- if self._vms.get(name) is None:
- self._vms[name] = self._new_vm(name, *args)
- if self.cpu is not None:
- self._vms[name].add_args('-cpu', self.cpu)
- if self.machine is not None:
- self._vms[name].set_machine(self.machine)
- return self._vms[name]
- def set_vm_arg(self, arg, value):
- """
- Set an argument to list of extra arguments to be given to the QEMU
- binary. If the argument already exists then its value is replaced.
- :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
- :type arg: str
- :param value: the argument value, such as "host" in "-cpu host"
- :type value: str
- """
- if not arg or not value:
- return
- if arg not in self.vm.args:
- self.vm.args.extend([arg, value])
- else:
- idx = self.vm.args.index(arg) + 1
- if idx < len(self.vm.args):
- self.vm.args[idx] = value
- else:
- self.vm.args.append(value)
- def tearDown(self):
- for vm in self._vms.values():
- vm.shutdown()
- logging.getLogger('console').removeHandler(self._console_log_fh)
- super().tearDown()
|