engine.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. #
  2. # Migration test main engine
  3. #
  4. # Copyright (c) 2016 Red Hat, Inc.
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import os
  20. import re
  21. import sys
  22. import time
  23. from guestperf.progress import Progress, ProgressStats
  24. from guestperf.report import Report
  25. from guestperf.timings import TimingRecord, Timings
  26. sys.path.append(os.path.join(os.path.dirname(__file__),
  27. '..', '..', '..', 'python'))
  28. from qemu.machine import QEMUMachine
  29. class Engine(object):
  30. def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
  31. sleep=15, verbose=False, debug=False):
  32. self._binary = binary # Path to QEMU binary
  33. self._dst_host = dst_host # Hostname of target host
  34. self._kernel = kernel # Path to kernel image
  35. self._initrd = initrd # Path to stress initrd
  36. self._transport = transport # 'unix' or 'tcp' or 'rdma'
  37. self._sleep = sleep
  38. self._verbose = verbose
  39. self._debug = debug
  40. if debug:
  41. self._verbose = debug
  42. def _vcpu_timing(self, pid, tid_list):
  43. records = []
  44. now = time.time()
  45. jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  46. for tid in tid_list:
  47. statfile = "/proc/%d/task/%d/stat" % (pid, tid)
  48. with open(statfile, "r") as fh:
  49. stat = fh.readline()
  50. fields = stat.split(" ")
  51. stime = int(fields[13])
  52. utime = int(fields[14])
  53. records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
  54. return records
  55. def _cpu_timing(self, pid):
  56. now = time.time()
  57. jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  58. statfile = "/proc/%d/stat" % pid
  59. with open(statfile, "r") as fh:
  60. stat = fh.readline()
  61. fields = stat.split(" ")
  62. stime = int(fields[13])
  63. utime = int(fields[14])
  64. return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
  65. def _migrate_progress(self, vm):
  66. info = vm.cmd("query-migrate")
  67. if "ram" not in info:
  68. info["ram"] = {}
  69. return Progress(
  70. info.get("status", "active"),
  71. ProgressStats(
  72. info["ram"].get("transferred", 0),
  73. info["ram"].get("remaining", 0),
  74. info["ram"].get("total", 0),
  75. info["ram"].get("duplicate", 0),
  76. info["ram"].get("skipped", 0),
  77. info["ram"].get("normal", 0),
  78. info["ram"].get("normal-bytes", 0),
  79. info["ram"].get("dirty-pages-rate", 0),
  80. info["ram"].get("mbps", 0),
  81. info["ram"].get("dirty-sync-count", 0)
  82. ),
  83. time.time(),
  84. info.get("total-time", 0),
  85. info.get("downtime", 0),
  86. info.get("expected-downtime", 0),
  87. info.get("setup-time", 0),
  88. info.get("cpu-throttle-percentage", 0),
  89. )
  90. def _migrate(self, hardware, scenario, src, dst, connect_uri):
  91. src_qemu_time = []
  92. src_vcpu_time = []
  93. src_pid = src.get_pid()
  94. vcpus = src.cmd("query-cpus-fast")
  95. src_threads = []
  96. for vcpu in vcpus:
  97. src_threads.append(vcpu["thread-id"])
  98. # XXX how to get dst timings on remote host ?
  99. if self._verbose:
  100. print("Sleeping %d seconds for initial guest workload run" % self._sleep)
  101. sleep_secs = self._sleep
  102. while sleep_secs > 1:
  103. src_qemu_time.append(self._cpu_timing(src_pid))
  104. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  105. time.sleep(1)
  106. sleep_secs -= 1
  107. if self._verbose:
  108. print("Starting migration")
  109. if scenario._auto_converge:
  110. resp = src.cmd("migrate-set-capabilities",
  111. capabilities = [
  112. { "capability": "auto-converge",
  113. "state": True }
  114. ])
  115. resp = src.cmd("migrate-set-parameters",
  116. cpu_throttle_increment=scenario._auto_converge_step)
  117. if scenario._post_copy:
  118. resp = src.cmd("migrate-set-capabilities",
  119. capabilities = [
  120. { "capability": "postcopy-ram",
  121. "state": True }
  122. ])
  123. resp = dst.cmd("migrate-set-capabilities",
  124. capabilities = [
  125. { "capability": "postcopy-ram",
  126. "state": True }
  127. ])
  128. resp = src.cmd("migrate-set-parameters",
  129. max_bandwidth=scenario._bandwidth * 1024 * 1024)
  130. resp = src.cmd("migrate-set-parameters",
  131. downtime_limit=scenario._downtime)
  132. if scenario._compression_mt:
  133. resp = src.cmd("migrate-set-capabilities",
  134. capabilities = [
  135. { "capability": "compress",
  136. "state": True }
  137. ])
  138. resp = src.cmd("migrate-set-parameters",
  139. compress_threads=scenario._compression_mt_threads)
  140. resp = dst.cmd("migrate-set-capabilities",
  141. capabilities = [
  142. { "capability": "compress",
  143. "state": True }
  144. ])
  145. resp = dst.cmd("migrate-set-parameters",
  146. decompress_threads=scenario._compression_mt_threads)
  147. if scenario._compression_xbzrle:
  148. resp = src.cmd("migrate-set-capabilities",
  149. capabilities = [
  150. { "capability": "xbzrle",
  151. "state": True }
  152. ])
  153. resp = dst.cmd("migrate-set-capabilities",
  154. capabilities = [
  155. { "capability": "xbzrle",
  156. "state": True }
  157. ])
  158. resp = src.cmd("migrate-set-parameters",
  159. xbzrle_cache_size=(
  160. hardware._mem *
  161. 1024 * 1024 * 1024 / 100 *
  162. scenario._compression_xbzrle_cache))
  163. if scenario._multifd:
  164. resp = src.cmd("migrate-set-capabilities",
  165. capabilities = [
  166. { "capability": "multifd",
  167. "state": True }
  168. ])
  169. resp = src.cmd("migrate-set-parameters",
  170. multifd_channels=scenario._multifd_channels)
  171. resp = dst.cmd("migrate-set-capabilities",
  172. capabilities = [
  173. { "capability": "multifd",
  174. "state": True }
  175. ])
  176. resp = dst.cmd("migrate-set-parameters",
  177. multifd_channels=scenario._multifd_channels)
  178. resp = src.cmd("migrate", uri=connect_uri)
  179. post_copy = False
  180. paused = False
  181. progress_history = []
  182. start = time.time()
  183. loop = 0
  184. while True:
  185. loop = loop + 1
  186. time.sleep(0.05)
  187. progress = self._migrate_progress(src)
  188. if (loop % 20) == 0:
  189. src_qemu_time.append(self._cpu_timing(src_pid))
  190. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  191. if (len(progress_history) == 0 or
  192. (progress_history[-1]._ram._iterations <
  193. progress._ram._iterations)):
  194. progress_history.append(progress)
  195. if progress._status in ("completed", "failed", "cancelled"):
  196. if progress._status == "completed" and paused:
  197. dst.cmd("cont")
  198. if progress_history[-1] != progress:
  199. progress_history.append(progress)
  200. if progress._status == "completed":
  201. if self._verbose:
  202. print("Sleeping %d seconds for final guest workload run" % self._sleep)
  203. sleep_secs = self._sleep
  204. while sleep_secs > 1:
  205. time.sleep(1)
  206. src_qemu_time.append(self._cpu_timing(src_pid))
  207. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  208. sleep_secs -= 1
  209. return [progress_history, src_qemu_time, src_vcpu_time]
  210. if self._verbose and (loop % 20) == 0:
  211. print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
  212. progress._ram._iterations,
  213. progress._ram._remaining_bytes / (1024 * 1024),
  214. progress._ram._total_bytes / (1024 * 1024),
  215. progress._ram._transferred_bytes / (1024 * 1024),
  216. progress._ram._transfer_rate_mbs,
  217. ))
  218. if progress._ram._iterations > scenario._max_iters:
  219. if self._verbose:
  220. print("No completion after %d iterations over RAM" % scenario._max_iters)
  221. src.cmd("migrate_cancel")
  222. continue
  223. if time.time() > (start + scenario._max_time):
  224. if self._verbose:
  225. print("No completion after %d seconds" % scenario._max_time)
  226. src.cmd("migrate_cancel")
  227. continue
  228. if (scenario._post_copy and
  229. progress._ram._iterations >= scenario._post_copy_iters and
  230. not post_copy):
  231. if self._verbose:
  232. print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
  233. resp = src.cmd("migrate-start-postcopy")
  234. post_copy = True
  235. if (scenario._pause and
  236. progress._ram._iterations >= scenario._pause_iters and
  237. not paused):
  238. if self._verbose:
  239. print("Pausing VM after %d iterations" % scenario._pause_iters)
  240. resp = src.cmd("stop")
  241. paused = True
  242. def _is_ppc64le(self):
  243. _, _, _, _, machine = os.uname()
  244. if machine == "ppc64le":
  245. return True
  246. return False
  247. def _get_guest_console_args(self):
  248. if self._is_ppc64le():
  249. return "console=hvc0"
  250. else:
  251. return "console=ttyS0"
  252. def _get_qemu_serial_args(self):
  253. if self._is_ppc64le():
  254. return ["-chardev", "stdio,id=cdev0",
  255. "-device", "spapr-vty,chardev=cdev0"]
  256. else:
  257. return ["-chardev", "stdio,id=cdev0",
  258. "-device", "isa-serial,chardev=cdev0"]
  259. def _get_common_args(self, hardware, tunnelled=False):
  260. args = [
  261. "noapic",
  262. "edd=off",
  263. "printk.time=1",
  264. "noreplace-smp",
  265. "cgroup_disable=memory",
  266. "pci=noearly",
  267. ]
  268. args.append(self._get_guest_console_args())
  269. if self._debug:
  270. args.append("debug")
  271. else:
  272. args.append("quiet")
  273. args.append("ramsize=%s" % hardware._mem)
  274. cmdline = " ".join(args)
  275. if tunnelled:
  276. cmdline = "'" + cmdline + "'"
  277. argv = [
  278. "-accel", "kvm",
  279. "-cpu", "host",
  280. "-kernel", self._kernel,
  281. "-initrd", self._initrd,
  282. "-append", cmdline,
  283. "-m", str((hardware._mem * 1024) + 512),
  284. "-smp", str(hardware._cpus),
  285. ]
  286. argv.extend(self._get_qemu_serial_args())
  287. if self._debug:
  288. argv.extend(["-machine", "graphics=off"])
  289. if hardware._prealloc_pages:
  290. argv_source += ["-mem-path", "/dev/shm",
  291. "-mem-prealloc"]
  292. if hardware._locked_pages:
  293. argv_source += ["-overcommit", "mem-lock=on"]
  294. if hardware._huge_pages:
  295. pass
  296. return argv
  297. def _get_src_args(self, hardware):
  298. return self._get_common_args(hardware)
  299. def _get_dst_args(self, hardware, uri):
  300. tunnelled = False
  301. if self._dst_host != "localhost":
  302. tunnelled = True
  303. argv = self._get_common_args(hardware, tunnelled)
  304. return argv + ["-incoming", uri]
  305. @staticmethod
  306. def _get_common_wrapper(cpu_bind, mem_bind):
  307. wrapper = []
  308. if len(cpu_bind) > 0 or len(mem_bind) > 0:
  309. wrapper.append("numactl")
  310. if cpu_bind:
  311. wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
  312. if mem_bind:
  313. wrapper.append("--membind=%s" % ",".join(mem_bind))
  314. return wrapper
  315. def _get_src_wrapper(self, hardware):
  316. return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
  317. def _get_dst_wrapper(self, hardware):
  318. wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
  319. if self._dst_host != "localhost":
  320. return ["ssh",
  321. "-R", "9001:localhost:9001",
  322. self._dst_host] + wrapper
  323. else:
  324. return wrapper
  325. def _get_timings(self, vm):
  326. log = vm.get_log()
  327. if not log:
  328. return []
  329. if self._debug:
  330. print(log)
  331. regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
  332. matcher = re.compile(regex)
  333. records = []
  334. for line in log.split("\n"):
  335. match = matcher.match(line)
  336. if match:
  337. records.append(TimingRecord(int(match.group(1)),
  338. int(match.group(2)) / 1000.0,
  339. int(match.group(3))))
  340. return records
  341. def run(self, hardware, scenario, result_dir=os.getcwd()):
  342. abs_result_dir = os.path.join(result_dir, scenario._name)
  343. if self._transport == "tcp":
  344. uri = "tcp:%s:9000" % self._dst_host
  345. elif self._transport == "rdma":
  346. uri = "rdma:%s:9000" % self._dst_host
  347. elif self._transport == "unix":
  348. if self._dst_host != "localhost":
  349. raise Exception("Running use unix migration transport for non-local host")
  350. uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
  351. try:
  352. os.remove(uri[5:])
  353. os.remove(monaddr)
  354. except:
  355. pass
  356. if self._dst_host != "localhost":
  357. dstmonaddr = ("localhost", 9001)
  358. else:
  359. dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
  360. srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
  361. src = QEMUMachine(self._binary,
  362. args=self._get_src_args(hardware),
  363. wrapper=self._get_src_wrapper(hardware),
  364. name="qemu-src-%d" % os.getpid(),
  365. monitor_address=srcmonaddr)
  366. dst = QEMUMachine(self._binary,
  367. args=self._get_dst_args(hardware, uri),
  368. wrapper=self._get_dst_wrapper(hardware),
  369. name="qemu-dst-%d" % os.getpid(),
  370. monitor_address=dstmonaddr)
  371. try:
  372. src.launch()
  373. dst.launch()
  374. ret = self._migrate(hardware, scenario, src, dst, uri)
  375. progress_history = ret[0]
  376. qemu_timings = ret[1]
  377. vcpu_timings = ret[2]
  378. if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
  379. os.remove(uri[5:])
  380. if os.path.exists(srcmonaddr):
  381. os.remove(srcmonaddr)
  382. if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
  383. os.remove(dstmonaddr)
  384. if self._verbose:
  385. print("Finished migration")
  386. src.shutdown()
  387. dst.shutdown()
  388. return Report(hardware, scenario, progress_history,
  389. Timings(self._get_timings(src) + self._get_timings(dst)),
  390. Timings(qemu_timings),
  391. Timings(vcpu_timings),
  392. self._binary, self._dst_host, self._kernel,
  393. self._initrd, self._transport, self._sleep)
  394. except Exception as e:
  395. if self._debug:
  396. print("Failed: %s" % str(e))
  397. try:
  398. src.shutdown()
  399. except:
  400. pass
  401. try:
  402. dst.shutdown()
  403. except:
  404. pass
  405. if self._debug:
  406. print(src.get_log())
  407. print(dst.get_log())
  408. raise