engine.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. #
  2. # Migration test main engine
  3. #
  4. # Copyright (c) 2016 Red Hat, Inc.
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import os
  20. import re
  21. import sys
  22. import time
  23. from guestperf.progress import Progress, ProgressStats
  24. from guestperf.report import Report
  25. from guestperf.timings import TimingRecord, Timings
  26. sys.path.append(os.path.join(os.path.dirname(__file__),
  27. '..', '..', '..', 'python'))
  28. from qemu.machine import QEMUMachine
  29. class Engine(object):
  30. def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
  31. sleep=15, verbose=False, debug=False):
  32. self._binary = binary # Path to QEMU binary
  33. self._dst_host = dst_host # Hostname of target host
  34. self._kernel = kernel # Path to kernel image
  35. self._initrd = initrd # Path to stress initrd
  36. self._transport = transport # 'unix' or 'tcp' or 'rdma'
  37. self._sleep = sleep
  38. self._verbose = verbose
  39. self._debug = debug
  40. if debug:
  41. self._verbose = debug
  42. def _vcpu_timing(self, pid, tid_list):
  43. records = []
  44. now = time.time()
  45. jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  46. for tid in tid_list:
  47. statfile = "/proc/%d/task/%d/stat" % (pid, tid)
  48. with open(statfile, "r") as fh:
  49. stat = fh.readline()
  50. fields = stat.split(" ")
  51. stime = int(fields[13])
  52. utime = int(fields[14])
  53. records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
  54. return records
  55. def _cpu_timing(self, pid):
  56. now = time.time()
  57. jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  58. statfile = "/proc/%d/stat" % pid
  59. with open(statfile, "r") as fh:
  60. stat = fh.readline()
  61. fields = stat.split(" ")
  62. stime = int(fields[13])
  63. utime = int(fields[14])
  64. return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
  65. def _migrate_progress(self, vm):
  66. info = vm.cmd("query-migrate")
  67. if "ram" not in info:
  68. info["ram"] = {}
  69. return Progress(
  70. info.get("status", "active"),
  71. ProgressStats(
  72. info["ram"].get("transferred", 0),
  73. info["ram"].get("remaining", 0),
  74. info["ram"].get("total", 0),
  75. info["ram"].get("duplicate", 0),
  76. info["ram"].get("skipped", 0),
  77. info["ram"].get("normal", 0),
  78. info["ram"].get("normal-bytes", 0),
  79. info["ram"].get("dirty-pages-rate", 0),
  80. info["ram"].get("mbps", 0),
  81. info["ram"].get("dirty-sync-count", 0)
  82. ),
  83. time.time(),
  84. info.get("total-time", 0),
  85. info.get("downtime", 0),
  86. info.get("expected-downtime", 0),
  87. info.get("setup-time", 0),
  88. info.get("cpu-throttle-percentage", 0),
  89. info.get("dirty-limit-throttle-time-per-round", 0),
  90. info.get("dirty-limit-ring-full-time", 0),
  91. )
  92. def _migrate(self, hardware, scenario, src, dst, connect_uri):
  93. src_qemu_time = []
  94. src_vcpu_time = []
  95. src_pid = src.get_pid()
  96. vcpus = src.cmd("query-cpus-fast")
  97. src_threads = []
  98. for vcpu in vcpus:
  99. src_threads.append(vcpu["thread-id"])
  100. # XXX how to get dst timings on remote host ?
  101. if self._verbose:
  102. print("Sleeping %d seconds for initial guest workload run" % self._sleep)
  103. sleep_secs = self._sleep
  104. while sleep_secs > 1:
  105. src_qemu_time.append(self._cpu_timing(src_pid))
  106. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  107. time.sleep(1)
  108. sleep_secs -= 1
  109. if self._verbose:
  110. print("Starting migration")
  111. if scenario._auto_converge:
  112. resp = src.cmd("migrate-set-capabilities",
  113. capabilities = [
  114. { "capability": "auto-converge",
  115. "state": True }
  116. ])
  117. resp = src.cmd("migrate-set-parameters",
  118. cpu_throttle_increment=scenario._auto_converge_step)
  119. if scenario._post_copy:
  120. resp = src.cmd("migrate-set-capabilities",
  121. capabilities = [
  122. { "capability": "postcopy-ram",
  123. "state": True }
  124. ])
  125. resp = dst.cmd("migrate-set-capabilities",
  126. capabilities = [
  127. { "capability": "postcopy-ram",
  128. "state": True }
  129. ])
  130. resp = src.cmd("migrate-set-parameters",
  131. max_bandwidth=scenario._bandwidth * 1024 * 1024)
  132. resp = src.cmd("migrate-set-parameters",
  133. downtime_limit=scenario._downtime)
  134. if scenario._compression_mt:
  135. resp = src.cmd("migrate-set-capabilities",
  136. capabilities = [
  137. { "capability": "compress",
  138. "state": True }
  139. ])
  140. resp = src.cmd("migrate-set-parameters",
  141. compress_threads=scenario._compression_mt_threads)
  142. resp = dst.cmd("migrate-set-capabilities",
  143. capabilities = [
  144. { "capability": "compress",
  145. "state": True }
  146. ])
  147. resp = dst.cmd("migrate-set-parameters",
  148. decompress_threads=scenario._compression_mt_threads)
  149. if scenario._compression_xbzrle:
  150. resp = src.cmd("migrate-set-capabilities",
  151. capabilities = [
  152. { "capability": "xbzrle",
  153. "state": True }
  154. ])
  155. resp = dst.cmd("migrate-set-capabilities",
  156. capabilities = [
  157. { "capability": "xbzrle",
  158. "state": True }
  159. ])
  160. resp = src.cmd("migrate-set-parameters",
  161. xbzrle_cache_size=(
  162. hardware._mem *
  163. 1024 * 1024 * 1024 / 100 *
  164. scenario._compression_xbzrle_cache))
  165. if scenario._multifd:
  166. resp = src.cmd("migrate-set-capabilities",
  167. capabilities = [
  168. { "capability": "multifd",
  169. "state": True }
  170. ])
  171. resp = src.cmd("migrate-set-parameters",
  172. multifd_channels=scenario._multifd_channels)
  173. resp = dst.cmd("migrate-set-capabilities",
  174. capabilities = [
  175. { "capability": "multifd",
  176. "state": True }
  177. ])
  178. resp = dst.cmd("migrate-set-parameters",
  179. multifd_channels=scenario._multifd_channels)
  180. if scenario._dirty_limit:
  181. if not hardware._dirty_ring_size:
  182. raise Exception("dirty ring size must be configured when "
  183. "testing dirty limit migration")
  184. resp = src.cmd("migrate-set-capabilities",
  185. capabilities = [
  186. { "capability": "dirty-limit",
  187. "state": True }
  188. ])
  189. resp = src.cmd("migrate-set-parameters",
  190. x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period)
  191. resp = src.cmd("migrate-set-parameters",
  192. vcpu_dirty_limit=scenario._vcpu_dirty_limit)
  193. resp = src.cmd("migrate", uri=connect_uri)
  194. post_copy = False
  195. paused = False
  196. progress_history = []
  197. start = time.time()
  198. loop = 0
  199. while True:
  200. loop = loop + 1
  201. time.sleep(0.05)
  202. progress = self._migrate_progress(src)
  203. if (loop % 20) == 0:
  204. src_qemu_time.append(self._cpu_timing(src_pid))
  205. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  206. if (len(progress_history) == 0 or
  207. (progress_history[-1]._ram._iterations <
  208. progress._ram._iterations)):
  209. progress_history.append(progress)
  210. if progress._status in ("completed", "failed", "cancelled"):
  211. if progress._status == "completed" and paused:
  212. dst.cmd("cont")
  213. if progress_history[-1] != progress:
  214. progress_history.append(progress)
  215. if progress._status == "completed":
  216. if self._verbose:
  217. print("Sleeping %d seconds for final guest workload run" % self._sleep)
  218. sleep_secs = self._sleep
  219. while sleep_secs > 1:
  220. time.sleep(1)
  221. src_qemu_time.append(self._cpu_timing(src_pid))
  222. src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
  223. sleep_secs -= 1
  224. return [progress_history, src_qemu_time, src_vcpu_time]
  225. if self._verbose and (loop % 20) == 0:
  226. print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
  227. progress._ram._iterations,
  228. progress._ram._remaining_bytes / (1024 * 1024),
  229. progress._ram._total_bytes / (1024 * 1024),
  230. progress._ram._transferred_bytes / (1024 * 1024),
  231. progress._ram._transfer_rate_mbs,
  232. ))
  233. if progress._ram._iterations > scenario._max_iters:
  234. if self._verbose:
  235. print("No completion after %d iterations over RAM" % scenario._max_iters)
  236. src.cmd("migrate_cancel")
  237. continue
  238. if time.time() > (start + scenario._max_time):
  239. if self._verbose:
  240. print("No completion after %d seconds" % scenario._max_time)
  241. src.cmd("migrate_cancel")
  242. continue
  243. if (scenario._post_copy and
  244. progress._ram._iterations >= scenario._post_copy_iters and
  245. not post_copy):
  246. if self._verbose:
  247. print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
  248. resp = src.cmd("migrate-start-postcopy")
  249. post_copy = True
  250. if (scenario._pause and
  251. progress._ram._iterations >= scenario._pause_iters and
  252. not paused):
  253. if self._verbose:
  254. print("Pausing VM after %d iterations" % scenario._pause_iters)
  255. resp = src.cmd("stop")
  256. paused = True
  257. def _is_ppc64le(self):
  258. _, _, _, _, machine = os.uname()
  259. if machine == "ppc64le":
  260. return True
  261. return False
  262. def _get_guest_console_args(self):
  263. if self._is_ppc64le():
  264. return "console=hvc0"
  265. else:
  266. return "console=ttyS0"
  267. def _get_qemu_serial_args(self):
  268. if self._is_ppc64le():
  269. return ["-chardev", "stdio,id=cdev0",
  270. "-device", "spapr-vty,chardev=cdev0"]
  271. else:
  272. return ["-chardev", "stdio,id=cdev0",
  273. "-device", "isa-serial,chardev=cdev0"]
  274. def _get_common_args(self, hardware, tunnelled=False):
  275. args = [
  276. "noapic",
  277. "edd=off",
  278. "printk.time=1",
  279. "noreplace-smp",
  280. "cgroup_disable=memory",
  281. "pci=noearly",
  282. ]
  283. args.append(self._get_guest_console_args())
  284. if self._debug:
  285. args.append("debug")
  286. else:
  287. args.append("quiet")
  288. args.append("ramsize=%s" % hardware._mem)
  289. cmdline = " ".join(args)
  290. if tunnelled:
  291. cmdline = "'" + cmdline + "'"
  292. argv = [
  293. "-cpu", "host",
  294. "-kernel", self._kernel,
  295. "-initrd", self._initrd,
  296. "-append", cmdline,
  297. "-m", str((hardware._mem * 1024) + 512),
  298. "-smp", str(hardware._cpus),
  299. ]
  300. if hardware._dirty_ring_size:
  301. argv.extend(["-accel", "kvm,dirty-ring-size=%s" %
  302. hardware._dirty_ring_size])
  303. else:
  304. argv.extend(["-accel", "kvm"])
  305. argv.extend(self._get_qemu_serial_args())
  306. if self._debug:
  307. argv.extend(["-machine", "graphics=off"])
  308. if hardware._prealloc_pages:
  309. argv_source += ["-mem-path", "/dev/shm",
  310. "-mem-prealloc"]
  311. if hardware._locked_pages:
  312. argv_source += ["-overcommit", "mem-lock=on"]
  313. if hardware._huge_pages:
  314. pass
  315. return argv
  316. def _get_src_args(self, hardware):
  317. return self._get_common_args(hardware)
  318. def _get_dst_args(self, hardware, uri):
  319. tunnelled = False
  320. if self._dst_host != "localhost":
  321. tunnelled = True
  322. argv = self._get_common_args(hardware, tunnelled)
  323. return argv + ["-incoming", uri]
  324. @staticmethod
  325. def _get_common_wrapper(cpu_bind, mem_bind):
  326. wrapper = []
  327. if len(cpu_bind) > 0 or len(mem_bind) > 0:
  328. wrapper.append("numactl")
  329. if cpu_bind:
  330. wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
  331. if mem_bind:
  332. wrapper.append("--membind=%s" % ",".join(mem_bind))
  333. return wrapper
  334. def _get_src_wrapper(self, hardware):
  335. return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
  336. def _get_dst_wrapper(self, hardware):
  337. wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
  338. if self._dst_host != "localhost":
  339. return ["ssh",
  340. "-R", "9001:localhost:9001",
  341. self._dst_host] + wrapper
  342. else:
  343. return wrapper
  344. def _get_timings(self, vm):
  345. log = vm.get_log()
  346. if not log:
  347. return []
  348. if self._debug:
  349. print(log)
  350. regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
  351. matcher = re.compile(regex)
  352. records = []
  353. for line in log.split("\n"):
  354. match = matcher.match(line)
  355. if match:
  356. records.append(TimingRecord(int(match.group(1)),
  357. int(match.group(2)) / 1000.0,
  358. int(match.group(3))))
  359. return records
  360. def run(self, hardware, scenario, result_dir=os.getcwd()):
  361. abs_result_dir = os.path.join(result_dir, scenario._name)
  362. if self._transport == "tcp":
  363. uri = "tcp:%s:9000" % self._dst_host
  364. elif self._transport == "rdma":
  365. uri = "rdma:%s:9000" % self._dst_host
  366. elif self._transport == "unix":
  367. if self._dst_host != "localhost":
  368. raise Exception("Running use unix migration transport for non-local host")
  369. uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
  370. try:
  371. os.remove(uri[5:])
  372. os.remove(monaddr)
  373. except:
  374. pass
  375. if self._dst_host != "localhost":
  376. dstmonaddr = ("localhost", 9001)
  377. else:
  378. dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
  379. srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
  380. src = QEMUMachine(self._binary,
  381. args=self._get_src_args(hardware),
  382. wrapper=self._get_src_wrapper(hardware),
  383. name="qemu-src-%d" % os.getpid(),
  384. monitor_address=srcmonaddr)
  385. dst = QEMUMachine(self._binary,
  386. args=self._get_dst_args(hardware, uri),
  387. wrapper=self._get_dst_wrapper(hardware),
  388. name="qemu-dst-%d" % os.getpid(),
  389. monitor_address=dstmonaddr)
  390. try:
  391. src.launch()
  392. dst.launch()
  393. ret = self._migrate(hardware, scenario, src, dst, uri)
  394. progress_history = ret[0]
  395. qemu_timings = ret[1]
  396. vcpu_timings = ret[2]
  397. if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
  398. os.remove(uri[5:])
  399. if os.path.exists(srcmonaddr):
  400. os.remove(srcmonaddr)
  401. if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
  402. os.remove(dstmonaddr)
  403. if self._verbose:
  404. print("Finished migration")
  405. src.shutdown()
  406. dst.shutdown()
  407. return Report(hardware, scenario, progress_history,
  408. Timings(self._get_timings(src) + self._get_timings(dst)),
  409. Timings(qemu_timings),
  410. Timings(vcpu_timings),
  411. self._binary, self._dst_host, self._kernel,
  412. self._initrd, self._transport, self._sleep)
  413. except Exception as e:
  414. if self._debug:
  415. print("Failed: %s" % str(e))
  416. try:
  417. src.shutdown()
  418. except:
  419. pass
  420. try:
  421. dst.shutdown()
  422. except:
  423. pass
  424. if self._debug:
  425. print(src.get_log())
  426. print(dst.get_log())
  427. raise