123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941 |
- /*
- * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
- * (a.k.a. Fault Tolerance or Continuous Replication)
- *
- * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
- * Copyright (c) 2016 FUJITSU LIMITED
- * Copyright (c) 2016 Intel Corporation
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or
- * later. See the COPYING file in the top-level directory.
- */
- #include "qemu/osdep.h"
- #include "sysemu/sysemu.h"
- #include "qapi/error.h"
- #include "qapi/qapi-commands-migration.h"
- #include "migration.h"
- #include "qemu-file.h"
- #include "savevm.h"
- #include "migration/colo.h"
- #include "block.h"
- #include "io/channel-buffer.h"
- #include "trace.h"
- #include "qemu/error-report.h"
- #include "qemu/main-loop.h"
- #include "qemu/rcu.h"
- #include "migration/failover.h"
- #include "migration/ram.h"
- #ifdef CONFIG_REPLICATION
- #include "block/replication.h"
- #endif
- #include "net/colo-compare.h"
- #include "net/colo.h"
- #include "block/block.h"
- #include "qapi/qapi-events-migration.h"
- #include "sysemu/cpus.h"
- #include "sysemu/runstate.h"
- #include "net/filter.h"
- static bool vmstate_loading;
- static Notifier packets_compare_notifier;
- /* User need to know colo mode after COLO failover */
- static COLOMode last_colo_mode;
- #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
- bool migration_in_colo_state(void)
- {
- MigrationState *s = migrate_get_current();
- return (s->state == MIGRATION_STATUS_COLO);
- }
- bool migration_incoming_in_colo_state(void)
- {
- MigrationIncomingState *mis = migration_incoming_get_current();
- return mis && (mis->state == MIGRATION_STATUS_COLO);
- }
- static bool colo_runstate_is_stopped(void)
- {
- return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
- }
- static void secondary_vm_do_failover(void)
- {
- /* COLO needs enable block-replication */
- #ifdef CONFIG_REPLICATION
- int old_state;
- MigrationIncomingState *mis = migration_incoming_get_current();
- Error *local_err = NULL;
- /* Can not do failover during the process of VM's loading VMstate, Or
- * it will break the secondary VM.
- */
- if (vmstate_loading) {
- old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
- FAILOVER_STATUS_RELAUNCH);
- if (old_state != FAILOVER_STATUS_ACTIVE) {
- error_report("Unknown error while do failover for secondary VM,"
- "old_state: %s", FailoverStatus_str(old_state));
- }
- return;
- }
- migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
- MIGRATION_STATUS_COMPLETED);
- replication_stop_all(true, &local_err);
- if (local_err) {
- error_report_err(local_err);
- local_err = NULL;
- }
- /* Notify all filters of all NIC to do checkpoint */
- colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
- if (local_err) {
- error_report_err(local_err);
- }
- if (!autostart) {
- error_report("\"-S\" qemu option will be ignored in secondary side");
- /* recover runstate to normal migration finish state */
- autostart = true;
- }
- /*
- * Make sure COLO incoming thread not block in recv or send,
- * If mis->from_src_file and mis->to_src_file use the same fd,
- * The second shutdown() will return -1, we ignore this value,
- * It is harmless.
- */
- if (mis->from_src_file) {
- qemu_file_shutdown(mis->from_src_file);
- }
- if (mis->to_src_file) {
- qemu_file_shutdown(mis->to_src_file);
- }
- old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
- FAILOVER_STATUS_COMPLETED);
- if (old_state != FAILOVER_STATUS_ACTIVE) {
- error_report("Incorrect state (%s) while doing failover for "
- "secondary VM", FailoverStatus_str(old_state));
- return;
- }
- /* Notify COLO incoming thread that failover work is finished */
- qemu_sem_post(&mis->colo_incoming_sem);
- /* For Secondary VM, jump to incoming co */
- if (mis->migration_incoming_co) {
- qemu_coroutine_enter(mis->migration_incoming_co);
- }
- #else
- abort();
- #endif
- }
- static void primary_vm_do_failover(void)
- {
- #ifdef CONFIG_REPLICATION
- MigrationState *s = migrate_get_current();
- int old_state;
- Error *local_err = NULL;
- migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
- MIGRATION_STATUS_COMPLETED);
- /*
- * kick COLO thread which might wait at
- * qemu_sem_wait(&s->colo_checkpoint_sem).
- */
- colo_checkpoint_notify(s);
- /*
- * Wake up COLO thread which may blocked in recv() or send(),
- * The s->rp_state.from_dst_file and s->to_dst_file may use the
- * same fd, but we still shutdown the fd for twice, it is harmless.
- */
- if (s->to_dst_file) {
- qemu_file_shutdown(s->to_dst_file);
- }
- if (s->rp_state.from_dst_file) {
- qemu_file_shutdown(s->rp_state.from_dst_file);
- }
- old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
- FAILOVER_STATUS_COMPLETED);
- if (old_state != FAILOVER_STATUS_ACTIVE) {
- error_report("Incorrect state (%s) while doing failover for Primary VM",
- FailoverStatus_str(old_state));
- return;
- }
- replication_stop_all(true, &local_err);
- if (local_err) {
- error_report_err(local_err);
- local_err = NULL;
- }
- /* Notify COLO thread that failover work is finished */
- qemu_sem_post(&s->colo_exit_sem);
- #else
- abort();
- #endif
- }
- COLOMode get_colo_mode(void)
- {
- if (migration_in_colo_state()) {
- return COLO_MODE_PRIMARY;
- } else if (migration_incoming_in_colo_state()) {
- return COLO_MODE_SECONDARY;
- } else {
- return COLO_MODE_NONE;
- }
- }
- void colo_do_failover(void)
- {
- /* Make sure VM stopped while failover happened. */
- if (!colo_runstate_is_stopped()) {
- vm_stop_force_state(RUN_STATE_COLO);
- }
- switch (last_colo_mode = get_colo_mode()) {
- case COLO_MODE_PRIMARY:
- primary_vm_do_failover();
- break;
- case COLO_MODE_SECONDARY:
- secondary_vm_do_failover();
- break;
- default:
- error_report("colo_do_failover failed because the colo mode"
- " could not be obtained");
- }
- }
- #ifdef CONFIG_REPLICATION
- void qmp_xen_set_replication(bool enable, bool primary,
- bool has_failover, bool failover,
- Error **errp)
- {
- ReplicationMode mode = primary ?
- REPLICATION_MODE_PRIMARY :
- REPLICATION_MODE_SECONDARY;
- if (has_failover && enable) {
- error_setg(errp, "Parameter 'failover' is only for"
- " stopping replication");
- return;
- }
- if (enable) {
- replication_start_all(mode, errp);
- } else {
- if (!has_failover) {
- failover = NULL;
- }
- replication_stop_all(failover, failover ? NULL : errp);
- }
- }
- ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
- {
- Error *err = NULL;
- ReplicationStatus *s = g_new0(ReplicationStatus, 1);
- replication_get_error_all(&err);
- if (err) {
- s->error = true;
- s->desc = g_strdup(error_get_pretty(err));
- } else {
- s->error = false;
- }
- error_free(err);
- return s;
- }
- void qmp_xen_colo_do_checkpoint(Error **errp)
- {
- Error *err = NULL;
- replication_do_checkpoint_all(&err);
- if (err) {
- error_propagate(errp, err);
- return;
- }
- /* Notify all filters of all NIC to do checkpoint */
- colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
- }
- #endif
- COLOStatus *qmp_query_colo_status(Error **errp)
- {
- COLOStatus *s = g_new0(COLOStatus, 1);
- s->mode = get_colo_mode();
- s->last_mode = last_colo_mode;
- switch (failover_get_state()) {
- case FAILOVER_STATUS_NONE:
- s->reason = COLO_EXIT_REASON_NONE;
- break;
- case FAILOVER_STATUS_COMPLETED:
- s->reason = COLO_EXIT_REASON_REQUEST;
- break;
- default:
- if (migration_in_colo_state()) {
- s->reason = COLO_EXIT_REASON_PROCESSING;
- } else {
- s->reason = COLO_EXIT_REASON_ERROR;
- }
- }
- return s;
- }
- static void colo_send_message(QEMUFile *f, COLOMessage msg,
- Error **errp)
- {
- int ret;
- if (msg >= COLO_MESSAGE__MAX) {
- error_setg(errp, "%s: Invalid message", __func__);
- return;
- }
- qemu_put_be32(f, msg);
- qemu_fflush(f);
- ret = qemu_file_get_error(f);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "Can't send COLO message");
- }
- trace_colo_send_message(COLOMessage_str(msg));
- }
- static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
- uint64_t value, Error **errp)
- {
- Error *local_err = NULL;
- int ret;
- colo_send_message(f, msg, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- qemu_put_be64(f, value);
- qemu_fflush(f);
- ret = qemu_file_get_error(f);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "Failed to send value for message:%s",
- COLOMessage_str(msg));
- }
- }
- static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
- {
- COLOMessage msg;
- int ret;
- msg = qemu_get_be32(f);
- ret = qemu_file_get_error(f);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "Can't receive COLO message");
- return msg;
- }
- if (msg >= COLO_MESSAGE__MAX) {
- error_setg(errp, "%s: Invalid message", __func__);
- return msg;
- }
- trace_colo_receive_message(COLOMessage_str(msg));
- return msg;
- }
- static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
- Error **errp)
- {
- COLOMessage msg;
- Error *local_err = NULL;
- msg = colo_receive_message(f, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- if (msg != expect_msg) {
- error_setg(errp, "Unexpected COLO message %d, expected %d",
- msg, expect_msg);
- }
- }
- static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
- Error **errp)
- {
- Error *local_err = NULL;
- uint64_t value;
- int ret;
- colo_receive_check_message(f, expect_msg, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return 0;
- }
- value = qemu_get_be64(f);
- ret = qemu_file_get_error(f);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
- COLOMessage_str(expect_msg));
- }
- return value;
- }
- static int colo_do_checkpoint_transaction(MigrationState *s,
- QIOChannelBuffer *bioc,
- QEMUFile *fb)
- {
- Error *local_err = NULL;
- int ret = -1;
- colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
- &local_err);
- if (local_err) {
- goto out;
- }
- colo_receive_check_message(s->rp_state.from_dst_file,
- COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
- if (local_err) {
- goto out;
- }
- /* Reset channel-buffer directly */
- qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
- bioc->usage = 0;
- qemu_mutex_lock_iothread();
- if (failover_get_state() != FAILOVER_STATUS_NONE) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
- vm_stop_force_state(RUN_STATE_COLO);
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("run", "stop");
- /*
- * Failover request bh could be called after vm_stop_force_state(),
- * So we need check failover_request_is_active() again.
- */
- if (failover_get_state() != FAILOVER_STATUS_NONE) {
- goto out;
- }
- qemu_mutex_lock_iothread();
- #ifdef CONFIG_REPLICATION
- replication_do_checkpoint_all(&local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
- #else
- abort();
- #endif
- colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
- /* Note: device state is saved into buffer */
- ret = qemu_save_device_state(fb);
- qemu_mutex_unlock_iothread();
- if (ret < 0) {
- goto out;
- }
- if (migrate_auto_converge()) {
- mig_throttle_counter_reset();
- }
- /*
- * Only save VM's live state, which not including device state.
- * TODO: We may need a timeout mechanism to prevent COLO process
- * to be blocked here.
- */
- qemu_savevm_live_state(s->to_dst_file);
- qemu_fflush(fb);
- /*
- * We need the size of the VMstate data in Secondary side,
- * With which we can decide how much data should be read.
- */
- colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
- bioc->usage, &local_err);
- if (local_err) {
- goto out;
- }
- qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
- qemu_fflush(s->to_dst_file);
- ret = qemu_file_get_error(s->to_dst_file);
- if (ret < 0) {
- goto out;
- }
- colo_receive_check_message(s->rp_state.from_dst_file,
- COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
- if (local_err) {
- goto out;
- }
- qemu_event_reset(&s->colo_checkpoint_event);
- colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
- if (local_err) {
- goto out;
- }
- colo_receive_check_message(s->rp_state.from_dst_file,
- COLO_MESSAGE_VMSTATE_LOADED, &local_err);
- if (local_err) {
- goto out;
- }
- ret = 0;
- qemu_mutex_lock_iothread();
- vm_start();
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("stop", "run");
- out:
- if (local_err) {
- error_report_err(local_err);
- }
- return ret;
- }
- static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
- {
- colo_checkpoint_notify(data);
- }
- static void colo_process_checkpoint(MigrationState *s)
- {
- QIOChannelBuffer *bioc;
- QEMUFile *fb = NULL;
- Error *local_err = NULL;
- int ret;
- if (get_colo_mode() != COLO_MODE_PRIMARY) {
- error_report("COLO mode must be COLO_MODE_PRIMARY");
- return;
- }
- failover_init_state();
- s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
- if (!s->rp_state.from_dst_file) {
- error_report("Open QEMUFile from_dst_file failed");
- goto out;
- }
- packets_compare_notifier.notify = colo_compare_notify_checkpoint;
- colo_compare_register_notifier(&packets_compare_notifier);
- /*
- * Wait for Secondary finish loading VM states and enter COLO
- * restore.
- */
- colo_receive_check_message(s->rp_state.from_dst_file,
- COLO_MESSAGE_CHECKPOINT_READY, &local_err);
- if (local_err) {
- goto out;
- }
- bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
- fb = qemu_file_new_output(QIO_CHANNEL(bioc));
- object_unref(OBJECT(bioc));
- qemu_mutex_lock_iothread();
- #ifdef CONFIG_REPLICATION
- replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
- #else
- abort();
- #endif
- vm_start();
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("stop", "run");
- timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
- s->parameters.x_checkpoint_delay);
- while (s->state == MIGRATION_STATUS_COLO) {
- if (failover_get_state() != FAILOVER_STATUS_NONE) {
- error_report("failover request");
- goto out;
- }
- qemu_event_wait(&s->colo_checkpoint_event);
- if (s->state != MIGRATION_STATUS_COLO) {
- goto out;
- }
- ret = colo_do_checkpoint_transaction(s, bioc, fb);
- if (ret < 0) {
- goto out;
- }
- }
- out:
- /* Throw the unreported error message after exited from loop */
- if (local_err) {
- error_report_err(local_err);
- }
- if (fb) {
- qemu_fclose(fb);
- }
- /*
- * There are only two reasons we can get here, some error happened
- * or the user triggered failover.
- */
- switch (failover_get_state()) {
- case FAILOVER_STATUS_COMPLETED:
- qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
- COLO_EXIT_REASON_REQUEST);
- break;
- default:
- qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
- COLO_EXIT_REASON_ERROR);
- }
- /* Hope this not to be too long to wait here */
- qemu_sem_wait(&s->colo_exit_sem);
- qemu_sem_destroy(&s->colo_exit_sem);
- /*
- * It is safe to unregister notifier after failover finished.
- * Besides, colo_delay_timer and colo_checkpoint_sem can't be
- * released before unregister notifier, or there will be use-after-free
- * error.
- */
- colo_compare_unregister_notifier(&packets_compare_notifier);
- timer_free(s->colo_delay_timer);
- qemu_event_destroy(&s->colo_checkpoint_event);
- /*
- * Must be called after failover BH is completed,
- * Or the failover BH may shutdown the wrong fd that
- * re-used by other threads after we release here.
- */
- if (s->rp_state.from_dst_file) {
- qemu_fclose(s->rp_state.from_dst_file);
- s->rp_state.from_dst_file = NULL;
- }
- }
- void colo_checkpoint_notify(void *opaque)
- {
- MigrationState *s = opaque;
- int64_t next_notify_time;
- qemu_event_set(&s->colo_checkpoint_event);
- s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
- next_notify_time = s->colo_checkpoint_time +
- s->parameters.x_checkpoint_delay;
- timer_mod(s->colo_delay_timer, next_notify_time);
- }
- void migrate_start_colo_process(MigrationState *s)
- {
- qemu_mutex_unlock_iothread();
- qemu_event_init(&s->colo_checkpoint_event, false);
- s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST,
- colo_checkpoint_notify, s);
- qemu_sem_init(&s->colo_exit_sem, 0);
- colo_process_checkpoint(s);
- qemu_mutex_lock_iothread();
- }
- static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
- QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
- {
- uint64_t total_size;
- uint64_t value;
- Error *local_err = NULL;
- int ret;
- qemu_mutex_lock_iothread();
- vm_stop_force_state(RUN_STATE_COLO);
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("run", "stop");
- /* FIXME: This is unnecessary for periodic checkpoint mode */
- colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
- &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- colo_receive_check_message(mis->from_src_file,
- COLO_MESSAGE_VMSTATE_SEND, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- qemu_mutex_lock_iothread();
- cpu_synchronize_all_states();
- ret = qemu_loadvm_state_main(mis->from_src_file, mis);
- qemu_mutex_unlock_iothread();
- if (ret < 0) {
- error_setg(errp, "Load VM's live state (ram) error");
- return;
- }
- value = colo_receive_message_value(mis->from_src_file,
- COLO_MESSAGE_VMSTATE_SIZE, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- /*
- * Read VM device state data into channel buffer,
- * It's better to re-use the memory allocated.
- * Here we need to handle the channel buffer directly.
- */
- if (value > bioc->capacity) {
- bioc->capacity = value;
- bioc->data = g_realloc(bioc->data, bioc->capacity);
- }
- total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
- if (total_size != value) {
- error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
- " %" PRIu64, total_size, value);
- return;
- }
- bioc->usage = total_size;
- qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
- colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
- &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- qemu_mutex_lock_iothread();
- vmstate_loading = true;
- colo_flush_ram_cache();
- ret = qemu_load_device_state(fb);
- if (ret < 0) {
- error_setg(errp, "COLO: load device state failed");
- vmstate_loading = false;
- qemu_mutex_unlock_iothread();
- return;
- }
- #ifdef CONFIG_REPLICATION
- replication_get_error_all(&local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- vmstate_loading = false;
- qemu_mutex_unlock_iothread();
- return;
- }
- /* discard colo disk buffer */
- replication_do_checkpoint_all(&local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- vmstate_loading = false;
- qemu_mutex_unlock_iothread();
- return;
- }
- #else
- abort();
- #endif
- /* Notify all filters of all NIC to do checkpoint */
- colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- vmstate_loading = false;
- qemu_mutex_unlock_iothread();
- return;
- }
- vmstate_loading = false;
- vm_start();
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("stop", "run");
- if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
- return;
- }
- colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
- &local_err);
- error_propagate(errp, local_err);
- }
- static void colo_wait_handle_message(MigrationIncomingState *mis,
- QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
- {
- COLOMessage msg;
- Error *local_err = NULL;
- msg = colo_receive_message(mis->from_src_file, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- switch (msg) {
- case COLO_MESSAGE_CHECKPOINT_REQUEST:
- colo_incoming_process_checkpoint(mis, fb, bioc, errp);
- break;
- default:
- error_setg(errp, "Got unknown COLO message: %d", msg);
- break;
- }
- }
- void colo_shutdown(void)
- {
- MigrationIncomingState *mis = NULL;
- MigrationState *s = NULL;
- switch (get_colo_mode()) {
- case COLO_MODE_PRIMARY:
- s = migrate_get_current();
- qemu_event_set(&s->colo_checkpoint_event);
- qemu_sem_post(&s->colo_exit_sem);
- break;
- case COLO_MODE_SECONDARY:
- mis = migration_incoming_get_current();
- qemu_sem_post(&mis->colo_incoming_sem);
- break;
- default:
- break;
- }
- }
- void *colo_process_incoming_thread(void *opaque)
- {
- MigrationIncomingState *mis = opaque;
- QEMUFile *fb = NULL;
- QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
- Error *local_err = NULL;
- rcu_register_thread();
- qemu_sem_init(&mis->colo_incoming_sem, 0);
- migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
- MIGRATION_STATUS_COLO);
- if (get_colo_mode() != COLO_MODE_SECONDARY) {
- error_report("COLO mode must be COLO_MODE_SECONDARY");
- return NULL;
- }
- failover_init_state();
- mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
- if (!mis->to_src_file) {
- error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
- goto out;
- }
- /*
- * Note: the communication between Primary side and Secondary side
- * should be sequential, we set the fd to unblocked in migration incoming
- * coroutine, and here we are in the COLO incoming thread, so it is ok to
- * set the fd back to blocked.
- */
- qemu_file_set_blocking(mis->from_src_file, true);
- colo_incoming_start_dirty_log();
- bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
- fb = qemu_file_new_input(QIO_CHANNEL(bioc));
- object_unref(OBJECT(bioc));
- qemu_mutex_lock_iothread();
- #ifdef CONFIG_REPLICATION
- replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
- if (local_err) {
- qemu_mutex_unlock_iothread();
- goto out;
- }
- #else
- abort();
- #endif
- vm_start();
- qemu_mutex_unlock_iothread();
- trace_colo_vm_state_change("stop", "run");
- colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
- &local_err);
- if (local_err) {
- goto out;
- }
- while (mis->state == MIGRATION_STATUS_COLO) {
- colo_wait_handle_message(mis, fb, bioc, &local_err);
- if (local_err) {
- error_report_err(local_err);
- break;
- }
- if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
- failover_set_state(FAILOVER_STATUS_RELAUNCH,
- FAILOVER_STATUS_NONE);
- failover_request_active(NULL);
- break;
- }
- if (failover_get_state() != FAILOVER_STATUS_NONE) {
- error_report("failover request");
- break;
- }
- }
- out:
- /*
- * There are only two reasons we can get here, some error happened
- * or the user triggered failover.
- */
- switch (failover_get_state()) {
- case FAILOVER_STATUS_COMPLETED:
- qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
- COLO_EXIT_REASON_REQUEST);
- break;
- default:
- qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
- COLO_EXIT_REASON_ERROR);
- }
- if (fb) {
- qemu_fclose(fb);
- }
- /* Hope this not to be too long to loop here */
- qemu_sem_wait(&mis->colo_incoming_sem);
- qemu_sem_destroy(&mis->colo_incoming_sem);
- rcu_unregister_thread();
- return NULL;
- }
|