colo.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. /*
  2. * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
  3. * (a.k.a. Fault Tolerance or Continuous Replication)
  4. *
  5. * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
  6. * Copyright (c) 2016 FUJITSU LIMITED
  7. * Copyright (c) 2016 Intel Corporation
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or
  10. * later. See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include "sysemu/sysemu.h"
  14. #include "qapi/error.h"
  15. #include "qapi/qapi-commands-migration.h"
  16. #include "qemu-file-channel.h"
  17. #include "migration.h"
  18. #include "qemu-file.h"
  19. #include "savevm.h"
  20. #include "migration/colo.h"
  21. #include "block.h"
  22. #include "io/channel-buffer.h"
  23. #include "trace.h"
  24. #include "qemu/error-report.h"
  25. #include "qemu/main-loop.h"
  26. #include "qemu/rcu.h"
  27. #include "migration/failover.h"
  28. #include "migration/ram.h"
  29. #ifdef CONFIG_REPLICATION
  30. #include "replication.h"
  31. #endif
  32. #include "net/colo-compare.h"
  33. #include "net/colo.h"
  34. #include "block/block.h"
  35. #include "qapi/qapi-events-migration.h"
  36. #include "qapi/qmp/qerror.h"
  37. #include "sysemu/cpus.h"
  38. #include "sysemu/runstate.h"
  39. #include "net/filter.h"
  40. static bool vmstate_loading;
  41. static Notifier packets_compare_notifier;
  42. /* User need to know colo mode after COLO failover */
  43. static COLOMode last_colo_mode;
  44. #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
  45. bool migration_in_colo_state(void)
  46. {
  47. MigrationState *s = migrate_get_current();
  48. return (s->state == MIGRATION_STATUS_COLO);
  49. }
  50. bool migration_incoming_in_colo_state(void)
  51. {
  52. MigrationIncomingState *mis = migration_incoming_get_current();
  53. return mis && (mis->state == MIGRATION_STATUS_COLO);
  54. }
  55. static bool colo_runstate_is_stopped(void)
  56. {
  57. return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
  58. }
  59. static void secondary_vm_do_failover(void)
  60. {
  61. /* COLO needs enable block-replication */
  62. #ifdef CONFIG_REPLICATION
  63. int old_state;
  64. MigrationIncomingState *mis = migration_incoming_get_current();
  65. Error *local_err = NULL;
  66. /* Can not do failover during the process of VM's loading VMstate, Or
  67. * it will break the secondary VM.
  68. */
  69. if (vmstate_loading) {
  70. old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  71. FAILOVER_STATUS_RELAUNCH);
  72. if (old_state != FAILOVER_STATUS_ACTIVE) {
  73. error_report("Unknown error while do failover for secondary VM,"
  74. "old_state: %s", FailoverStatus_str(old_state));
  75. }
  76. return;
  77. }
  78. migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
  79. MIGRATION_STATUS_COMPLETED);
  80. replication_stop_all(true, &local_err);
  81. if (local_err) {
  82. error_report_err(local_err);
  83. local_err = NULL;
  84. }
  85. /* Notify all filters of all NIC to do checkpoint */
  86. colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
  87. if (local_err) {
  88. error_report_err(local_err);
  89. }
  90. if (!autostart) {
  91. error_report("\"-S\" qemu option will be ignored in secondary side");
  92. /* recover runstate to normal migration finish state */
  93. autostart = true;
  94. }
  95. /*
  96. * Make sure COLO incoming thread not block in recv or send,
  97. * If mis->from_src_file and mis->to_src_file use the same fd,
  98. * The second shutdown() will return -1, we ignore this value,
  99. * It is harmless.
  100. */
  101. if (mis->from_src_file) {
  102. qemu_file_shutdown(mis->from_src_file);
  103. }
  104. if (mis->to_src_file) {
  105. qemu_file_shutdown(mis->to_src_file);
  106. }
  107. old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  108. FAILOVER_STATUS_COMPLETED);
  109. if (old_state != FAILOVER_STATUS_ACTIVE) {
  110. error_report("Incorrect state (%s) while doing failover for "
  111. "secondary VM", FailoverStatus_str(old_state));
  112. return;
  113. }
  114. /* Notify COLO incoming thread that failover work is finished */
  115. qemu_sem_post(&mis->colo_incoming_sem);
  116. /* For Secondary VM, jump to incoming co */
  117. if (mis->migration_incoming_co) {
  118. qemu_coroutine_enter(mis->migration_incoming_co);
  119. }
  120. #else
  121. abort();
  122. #endif
  123. }
  124. static void primary_vm_do_failover(void)
  125. {
  126. #ifdef CONFIG_REPLICATION
  127. MigrationState *s = migrate_get_current();
  128. int old_state;
  129. Error *local_err = NULL;
  130. migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
  131. MIGRATION_STATUS_COMPLETED);
  132. /*
  133. * kick COLO thread which might wait at
  134. * qemu_sem_wait(&s->colo_checkpoint_sem).
  135. */
  136. colo_checkpoint_notify(migrate_get_current());
  137. /*
  138. * Wake up COLO thread which may blocked in recv() or send(),
  139. * The s->rp_state.from_dst_file and s->to_dst_file may use the
  140. * same fd, but we still shutdown the fd for twice, it is harmless.
  141. */
  142. if (s->to_dst_file) {
  143. qemu_file_shutdown(s->to_dst_file);
  144. }
  145. if (s->rp_state.from_dst_file) {
  146. qemu_file_shutdown(s->rp_state.from_dst_file);
  147. }
  148. old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
  149. FAILOVER_STATUS_COMPLETED);
  150. if (old_state != FAILOVER_STATUS_ACTIVE) {
  151. error_report("Incorrect state (%s) while doing failover for Primary VM",
  152. FailoverStatus_str(old_state));
  153. return;
  154. }
  155. replication_stop_all(true, &local_err);
  156. if (local_err) {
  157. error_report_err(local_err);
  158. local_err = NULL;
  159. }
  160. /* Notify COLO thread that failover work is finished */
  161. qemu_sem_post(&s->colo_exit_sem);
  162. #else
  163. abort();
  164. #endif
  165. }
  166. COLOMode get_colo_mode(void)
  167. {
  168. if (migration_in_colo_state()) {
  169. return COLO_MODE_PRIMARY;
  170. } else if (migration_incoming_in_colo_state()) {
  171. return COLO_MODE_SECONDARY;
  172. } else {
  173. return COLO_MODE_NONE;
  174. }
  175. }
  176. void colo_do_failover(void)
  177. {
  178. /* Make sure VM stopped while failover happened. */
  179. if (!colo_runstate_is_stopped()) {
  180. vm_stop_force_state(RUN_STATE_COLO);
  181. }
  182. switch (get_colo_mode()) {
  183. case COLO_MODE_PRIMARY:
  184. primary_vm_do_failover();
  185. break;
  186. case COLO_MODE_SECONDARY:
  187. secondary_vm_do_failover();
  188. break;
  189. default:
  190. error_report("colo_do_failover failed because the colo mode"
  191. " could not be obtained");
  192. }
  193. }
  194. #ifdef CONFIG_REPLICATION
  195. void qmp_xen_set_replication(bool enable, bool primary,
  196. bool has_failover, bool failover,
  197. Error **errp)
  198. {
  199. ReplicationMode mode = primary ?
  200. REPLICATION_MODE_PRIMARY :
  201. REPLICATION_MODE_SECONDARY;
  202. if (has_failover && enable) {
  203. error_setg(errp, "Parameter 'failover' is only for"
  204. " stopping replication");
  205. return;
  206. }
  207. if (enable) {
  208. replication_start_all(mode, errp);
  209. } else {
  210. if (!has_failover) {
  211. failover = NULL;
  212. }
  213. replication_stop_all(failover, failover ? NULL : errp);
  214. }
  215. }
  216. ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
  217. {
  218. Error *err = NULL;
  219. ReplicationStatus *s = g_new0(ReplicationStatus, 1);
  220. replication_get_error_all(&err);
  221. if (err) {
  222. s->error = true;
  223. s->has_desc = true;
  224. s->desc = g_strdup(error_get_pretty(err));
  225. } else {
  226. s->error = false;
  227. }
  228. error_free(err);
  229. return s;
  230. }
  231. void qmp_xen_colo_do_checkpoint(Error **errp)
  232. {
  233. Error *err = NULL;
  234. replication_do_checkpoint_all(&err);
  235. if (err) {
  236. error_propagate(errp, err);
  237. return;
  238. }
  239. /* Notify all filters of all NIC to do checkpoint */
  240. colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
  241. }
  242. #endif
  243. COLOStatus *qmp_query_colo_status(Error **errp)
  244. {
  245. COLOStatus *s = g_new0(COLOStatus, 1);
  246. s->mode = get_colo_mode();
  247. s->last_mode = last_colo_mode;
  248. switch (failover_get_state()) {
  249. case FAILOVER_STATUS_NONE:
  250. s->reason = COLO_EXIT_REASON_NONE;
  251. break;
  252. case FAILOVER_STATUS_COMPLETED:
  253. s->reason = COLO_EXIT_REASON_REQUEST;
  254. break;
  255. default:
  256. if (migration_in_colo_state()) {
  257. s->reason = COLO_EXIT_REASON_PROCESSING;
  258. } else {
  259. s->reason = COLO_EXIT_REASON_ERROR;
  260. }
  261. }
  262. return s;
  263. }
  264. static void colo_send_message(QEMUFile *f, COLOMessage msg,
  265. Error **errp)
  266. {
  267. int ret;
  268. if (msg >= COLO_MESSAGE__MAX) {
  269. error_setg(errp, "%s: Invalid message", __func__);
  270. return;
  271. }
  272. qemu_put_be32(f, msg);
  273. qemu_fflush(f);
  274. ret = qemu_file_get_error(f);
  275. if (ret < 0) {
  276. error_setg_errno(errp, -ret, "Can't send COLO message");
  277. }
  278. trace_colo_send_message(COLOMessage_str(msg));
  279. }
  280. static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
  281. uint64_t value, Error **errp)
  282. {
  283. Error *local_err = NULL;
  284. int ret;
  285. colo_send_message(f, msg, &local_err);
  286. if (local_err) {
  287. error_propagate(errp, local_err);
  288. return;
  289. }
  290. qemu_put_be64(f, value);
  291. qemu_fflush(f);
  292. ret = qemu_file_get_error(f);
  293. if (ret < 0) {
  294. error_setg_errno(errp, -ret, "Failed to send value for message:%s",
  295. COLOMessage_str(msg));
  296. }
  297. }
  298. static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
  299. {
  300. COLOMessage msg;
  301. int ret;
  302. msg = qemu_get_be32(f);
  303. ret = qemu_file_get_error(f);
  304. if (ret < 0) {
  305. error_setg_errno(errp, -ret, "Can't receive COLO message");
  306. return msg;
  307. }
  308. if (msg >= COLO_MESSAGE__MAX) {
  309. error_setg(errp, "%s: Invalid message", __func__);
  310. return msg;
  311. }
  312. trace_colo_receive_message(COLOMessage_str(msg));
  313. return msg;
  314. }
  315. static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
  316. Error **errp)
  317. {
  318. COLOMessage msg;
  319. Error *local_err = NULL;
  320. msg = colo_receive_message(f, &local_err);
  321. if (local_err) {
  322. error_propagate(errp, local_err);
  323. return;
  324. }
  325. if (msg != expect_msg) {
  326. error_setg(errp, "Unexpected COLO message %d, expected %d",
  327. msg, expect_msg);
  328. }
  329. }
  330. static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
  331. Error **errp)
  332. {
  333. Error *local_err = NULL;
  334. uint64_t value;
  335. int ret;
  336. colo_receive_check_message(f, expect_msg, &local_err);
  337. if (local_err) {
  338. error_propagate(errp, local_err);
  339. return 0;
  340. }
  341. value = qemu_get_be64(f);
  342. ret = qemu_file_get_error(f);
  343. if (ret < 0) {
  344. error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
  345. COLOMessage_str(expect_msg));
  346. }
  347. return value;
  348. }
  349. static int colo_do_checkpoint_transaction(MigrationState *s,
  350. QIOChannelBuffer *bioc,
  351. QEMUFile *fb)
  352. {
  353. Error *local_err = NULL;
  354. int ret = -1;
  355. colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
  356. &local_err);
  357. if (local_err) {
  358. goto out;
  359. }
  360. colo_receive_check_message(s->rp_state.from_dst_file,
  361. COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
  362. if (local_err) {
  363. goto out;
  364. }
  365. /* Reset channel-buffer directly */
  366. qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
  367. bioc->usage = 0;
  368. qemu_mutex_lock_iothread();
  369. if (failover_get_state() != FAILOVER_STATUS_NONE) {
  370. qemu_mutex_unlock_iothread();
  371. goto out;
  372. }
  373. vm_stop_force_state(RUN_STATE_COLO);
  374. qemu_mutex_unlock_iothread();
  375. trace_colo_vm_state_change("run", "stop");
  376. /*
  377. * Failover request bh could be called after vm_stop_force_state(),
  378. * So we need check failover_request_is_active() again.
  379. */
  380. if (failover_get_state() != FAILOVER_STATUS_NONE) {
  381. goto out;
  382. }
  383. /* Disable block migration */
  384. migrate_set_block_enabled(false, &local_err);
  385. if (local_err) {
  386. goto out;
  387. }
  388. qemu_mutex_lock_iothread();
  389. #ifdef CONFIG_REPLICATION
  390. replication_do_checkpoint_all(&local_err);
  391. if (local_err) {
  392. qemu_mutex_unlock_iothread();
  393. goto out;
  394. }
  395. #else
  396. abort();
  397. #endif
  398. colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
  399. if (local_err) {
  400. qemu_mutex_unlock_iothread();
  401. goto out;
  402. }
  403. /* Note: device state is saved into buffer */
  404. ret = qemu_save_device_state(fb);
  405. qemu_mutex_unlock_iothread();
  406. if (ret < 0) {
  407. goto out;
  408. }
  409. /*
  410. * Only save VM's live state, which not including device state.
  411. * TODO: We may need a timeout mechanism to prevent COLO process
  412. * to be blocked here.
  413. */
  414. qemu_savevm_live_state(s->to_dst_file);
  415. qemu_fflush(fb);
  416. /*
  417. * We need the size of the VMstate data in Secondary side,
  418. * With which we can decide how much data should be read.
  419. */
  420. colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
  421. bioc->usage, &local_err);
  422. if (local_err) {
  423. goto out;
  424. }
  425. qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
  426. qemu_fflush(s->to_dst_file);
  427. ret = qemu_file_get_error(s->to_dst_file);
  428. if (ret < 0) {
  429. goto out;
  430. }
  431. colo_receive_check_message(s->rp_state.from_dst_file,
  432. COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
  433. if (local_err) {
  434. goto out;
  435. }
  436. qemu_event_reset(&s->colo_checkpoint_event);
  437. colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
  438. if (local_err) {
  439. goto out;
  440. }
  441. colo_receive_check_message(s->rp_state.from_dst_file,
  442. COLO_MESSAGE_VMSTATE_LOADED, &local_err);
  443. if (local_err) {
  444. goto out;
  445. }
  446. ret = 0;
  447. qemu_mutex_lock_iothread();
  448. vm_start();
  449. qemu_mutex_unlock_iothread();
  450. trace_colo_vm_state_change("stop", "run");
  451. out:
  452. if (local_err) {
  453. error_report_err(local_err);
  454. }
  455. return ret;
  456. }
  457. static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
  458. {
  459. colo_checkpoint_notify(data);
  460. }
  461. static void colo_process_checkpoint(MigrationState *s)
  462. {
  463. QIOChannelBuffer *bioc;
  464. QEMUFile *fb = NULL;
  465. int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  466. Error *local_err = NULL;
  467. int ret;
  468. last_colo_mode = get_colo_mode();
  469. if (last_colo_mode != COLO_MODE_PRIMARY) {
  470. error_report("COLO mode must be COLO_MODE_PRIMARY");
  471. return;
  472. }
  473. failover_init_state();
  474. s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
  475. if (!s->rp_state.from_dst_file) {
  476. error_report("Open QEMUFile from_dst_file failed");
  477. goto out;
  478. }
  479. packets_compare_notifier.notify = colo_compare_notify_checkpoint;
  480. colo_compare_register_notifier(&packets_compare_notifier);
  481. /*
  482. * Wait for Secondary finish loading VM states and enter COLO
  483. * restore.
  484. */
  485. colo_receive_check_message(s->rp_state.from_dst_file,
  486. COLO_MESSAGE_CHECKPOINT_READY, &local_err);
  487. if (local_err) {
  488. goto out;
  489. }
  490. bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
  491. fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
  492. object_unref(OBJECT(bioc));
  493. qemu_mutex_lock_iothread();
  494. #ifdef CONFIG_REPLICATION
  495. replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
  496. if (local_err) {
  497. qemu_mutex_unlock_iothread();
  498. goto out;
  499. }
  500. #else
  501. abort();
  502. #endif
  503. vm_start();
  504. qemu_mutex_unlock_iothread();
  505. trace_colo_vm_state_change("stop", "run");
  506. timer_mod(s->colo_delay_timer,
  507. current_time + s->parameters.x_checkpoint_delay);
  508. while (s->state == MIGRATION_STATUS_COLO) {
  509. if (failover_get_state() != FAILOVER_STATUS_NONE) {
  510. error_report("failover request");
  511. goto out;
  512. }
  513. qemu_event_wait(&s->colo_checkpoint_event);
  514. if (s->state != MIGRATION_STATUS_COLO) {
  515. goto out;
  516. }
  517. ret = colo_do_checkpoint_transaction(s, bioc, fb);
  518. if (ret < 0) {
  519. goto out;
  520. }
  521. }
  522. out:
  523. /* Throw the unreported error message after exited from loop */
  524. if (local_err) {
  525. error_report_err(local_err);
  526. }
  527. if (fb) {
  528. qemu_fclose(fb);
  529. }
  530. /*
  531. * There are only two reasons we can get here, some error happened
  532. * or the user triggered failover.
  533. */
  534. switch (failover_get_state()) {
  535. case FAILOVER_STATUS_COMPLETED:
  536. qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
  537. COLO_EXIT_REASON_REQUEST);
  538. break;
  539. default:
  540. qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
  541. COLO_EXIT_REASON_ERROR);
  542. }
  543. /* Hope this not to be too long to wait here */
  544. qemu_sem_wait(&s->colo_exit_sem);
  545. qemu_sem_destroy(&s->colo_exit_sem);
  546. /*
  547. * It is safe to unregister notifier after failover finished.
  548. * Besides, colo_delay_timer and colo_checkpoint_sem can't be
  549. * released before unregister notifier, or there will be use-after-free
  550. * error.
  551. */
  552. colo_compare_unregister_notifier(&packets_compare_notifier);
  553. timer_del(s->colo_delay_timer);
  554. timer_free(s->colo_delay_timer);
  555. qemu_event_destroy(&s->colo_checkpoint_event);
  556. /*
  557. * Must be called after failover BH is completed,
  558. * Or the failover BH may shutdown the wrong fd that
  559. * re-used by other threads after we release here.
  560. */
  561. if (s->rp_state.from_dst_file) {
  562. qemu_fclose(s->rp_state.from_dst_file);
  563. }
  564. }
  565. void colo_checkpoint_notify(void *opaque)
  566. {
  567. MigrationState *s = opaque;
  568. int64_t next_notify_time;
  569. qemu_event_set(&s->colo_checkpoint_event);
  570. s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
  571. next_notify_time = s->colo_checkpoint_time +
  572. s->parameters.x_checkpoint_delay;
  573. timer_mod(s->colo_delay_timer, next_notify_time);
  574. }
  575. void migrate_start_colo_process(MigrationState *s)
  576. {
  577. qemu_mutex_unlock_iothread();
  578. qemu_event_init(&s->colo_checkpoint_event, false);
  579. s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST,
  580. colo_checkpoint_notify, s);
  581. qemu_sem_init(&s->colo_exit_sem, 0);
  582. migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
  583. MIGRATION_STATUS_COLO);
  584. colo_process_checkpoint(s);
  585. qemu_mutex_lock_iothread();
  586. }
  587. static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
  588. QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
  589. {
  590. uint64_t total_size;
  591. uint64_t value;
  592. Error *local_err = NULL;
  593. int ret;
  594. qemu_mutex_lock_iothread();
  595. vm_stop_force_state(RUN_STATE_COLO);
  596. trace_colo_vm_state_change("run", "stop");
  597. qemu_mutex_unlock_iothread();
  598. /* FIXME: This is unnecessary for periodic checkpoint mode */
  599. colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
  600. &local_err);
  601. if (local_err) {
  602. error_propagate(errp, local_err);
  603. return;
  604. }
  605. colo_receive_check_message(mis->from_src_file,
  606. COLO_MESSAGE_VMSTATE_SEND, &local_err);
  607. if (local_err) {
  608. error_propagate(errp, local_err);
  609. return;
  610. }
  611. qemu_mutex_lock_iothread();
  612. cpu_synchronize_all_states();
  613. ret = qemu_loadvm_state_main(mis->from_src_file, mis);
  614. qemu_mutex_unlock_iothread();
  615. if (ret < 0) {
  616. error_setg(errp, "Load VM's live state (ram) error");
  617. return;
  618. }
  619. value = colo_receive_message_value(mis->from_src_file,
  620. COLO_MESSAGE_VMSTATE_SIZE, &local_err);
  621. if (local_err) {
  622. error_propagate(errp, local_err);
  623. return;
  624. }
  625. /*
  626. * Read VM device state data into channel buffer,
  627. * It's better to re-use the memory allocated.
  628. * Here we need to handle the channel buffer directly.
  629. */
  630. if (value > bioc->capacity) {
  631. bioc->capacity = value;
  632. bioc->data = g_realloc(bioc->data, bioc->capacity);
  633. }
  634. total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
  635. if (total_size != value) {
  636. error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
  637. " %" PRIu64, total_size, value);
  638. return;
  639. }
  640. bioc->usage = total_size;
  641. qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
  642. colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
  643. &local_err);
  644. if (local_err) {
  645. error_propagate(errp, local_err);
  646. return;
  647. }
  648. qemu_mutex_lock_iothread();
  649. vmstate_loading = true;
  650. colo_flush_ram_cache();
  651. ret = qemu_load_device_state(fb);
  652. if (ret < 0) {
  653. error_setg(errp, "COLO: load device state failed");
  654. vmstate_loading = false;
  655. qemu_mutex_unlock_iothread();
  656. return;
  657. }
  658. #ifdef CONFIG_REPLICATION
  659. replication_get_error_all(&local_err);
  660. if (local_err) {
  661. error_propagate(errp, local_err);
  662. vmstate_loading = false;
  663. qemu_mutex_unlock_iothread();
  664. return;
  665. }
  666. /* discard colo disk buffer */
  667. replication_do_checkpoint_all(&local_err);
  668. if (local_err) {
  669. error_propagate(errp, local_err);
  670. vmstate_loading = false;
  671. qemu_mutex_unlock_iothread();
  672. return;
  673. }
  674. #else
  675. abort();
  676. #endif
  677. /* Notify all filters of all NIC to do checkpoint */
  678. colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
  679. if (local_err) {
  680. error_propagate(errp, local_err);
  681. vmstate_loading = false;
  682. qemu_mutex_unlock_iothread();
  683. return;
  684. }
  685. vmstate_loading = false;
  686. vm_start();
  687. trace_colo_vm_state_change("stop", "run");
  688. qemu_mutex_unlock_iothread();
  689. if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
  690. return;
  691. }
  692. colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
  693. &local_err);
  694. error_propagate(errp, local_err);
  695. }
  696. static void colo_wait_handle_message(MigrationIncomingState *mis,
  697. QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
  698. {
  699. COLOMessage msg;
  700. Error *local_err = NULL;
  701. msg = colo_receive_message(mis->from_src_file, &local_err);
  702. if (local_err) {
  703. error_propagate(errp, local_err);
  704. return;
  705. }
  706. switch (msg) {
  707. case COLO_MESSAGE_CHECKPOINT_REQUEST:
  708. colo_incoming_process_checkpoint(mis, fb, bioc, errp);
  709. break;
  710. default:
  711. error_setg(errp, "Got unknown COLO message: %d", msg);
  712. break;
  713. }
  714. }
  715. void *colo_process_incoming_thread(void *opaque)
  716. {
  717. MigrationIncomingState *mis = opaque;
  718. QEMUFile *fb = NULL;
  719. QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
  720. Error *local_err = NULL;
  721. rcu_register_thread();
  722. qemu_sem_init(&mis->colo_incoming_sem, 0);
  723. migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
  724. MIGRATION_STATUS_COLO);
  725. last_colo_mode = get_colo_mode();
  726. if (last_colo_mode != COLO_MODE_SECONDARY) {
  727. error_report("COLO mode must be COLO_MODE_SECONDARY");
  728. return NULL;
  729. }
  730. failover_init_state();
  731. mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
  732. if (!mis->to_src_file) {
  733. error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
  734. goto out;
  735. }
  736. /*
  737. * Note: the communication between Primary side and Secondary side
  738. * should be sequential, we set the fd to unblocked in migration incoming
  739. * coroutine, and here we are in the COLO incoming thread, so it is ok to
  740. * set the fd back to blocked.
  741. */
  742. qemu_file_set_blocking(mis->from_src_file, true);
  743. colo_incoming_start_dirty_log();
  744. bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
  745. fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
  746. object_unref(OBJECT(bioc));
  747. qemu_mutex_lock_iothread();
  748. #ifdef CONFIG_REPLICATION
  749. replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
  750. if (local_err) {
  751. qemu_mutex_unlock_iothread();
  752. goto out;
  753. }
  754. #else
  755. abort();
  756. #endif
  757. vm_start();
  758. trace_colo_vm_state_change("stop", "run");
  759. qemu_mutex_unlock_iothread();
  760. colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
  761. &local_err);
  762. if (local_err) {
  763. goto out;
  764. }
  765. while (mis->state == MIGRATION_STATUS_COLO) {
  766. colo_wait_handle_message(mis, fb, bioc, &local_err);
  767. if (local_err) {
  768. error_report_err(local_err);
  769. break;
  770. }
  771. if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
  772. failover_set_state(FAILOVER_STATUS_RELAUNCH,
  773. FAILOVER_STATUS_NONE);
  774. failover_request_active(NULL);
  775. break;
  776. }
  777. if (failover_get_state() != FAILOVER_STATUS_NONE) {
  778. error_report("failover request");
  779. break;
  780. }
  781. }
  782. out:
  783. /*
  784. * There are only two reasons we can get here, some error happened
  785. * or the user triggered failover.
  786. */
  787. switch (failover_get_state()) {
  788. case FAILOVER_STATUS_COMPLETED:
  789. qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
  790. COLO_EXIT_REASON_REQUEST);
  791. break;
  792. default:
  793. qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
  794. COLO_EXIT_REASON_ERROR);
  795. }
  796. if (fb) {
  797. qemu_fclose(fb);
  798. }
  799. /* Hope this not to be too long to loop here */
  800. qemu_sem_wait(&mis->colo_incoming_sem);
  801. qemu_sem_destroy(&mis->colo_incoming_sem);
  802. /* Must be called after failover BH is completed */
  803. if (mis->to_src_file) {
  804. qemu_fclose(mis->to_src_file);
  805. mis->to_src_file = NULL;
  806. }
  807. rcu_unregister_thread();
  808. return NULL;
  809. }