2
0

multifd-device-state.c 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. /*
  2. * Multifd device state migration
  3. *
  4. * Copyright (C) 2024,2025 Oracle and/or its affiliates.
  5. *
  6. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  7. * See the COPYING file in the top-level directory.
  8. *
  9. * SPDX-License-Identifier: GPL-2.0-or-later
  10. */
  11. #include "qemu/osdep.h"
  12. #include "qapi/error.h"
  13. #include "qemu/lockable.h"
  14. #include "block/thread-pool.h"
  15. #include "migration.h"
  16. #include "migration/misc.h"
  17. #include "multifd.h"
  18. #include "options.h"
  19. static struct {
  20. QemuMutex queue_job_mutex;
  21. MultiFDSendData *send_data;
  22. ThreadPool *threads;
  23. bool threads_abort;
  24. } *multifd_send_device_state;
  25. void multifd_device_state_send_setup(void)
  26. {
  27. assert(!multifd_send_device_state);
  28. multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state));
  29. qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
  30. multifd_send_device_state->send_data = multifd_send_data_alloc();
  31. multifd_send_device_state->threads = thread_pool_new();
  32. multifd_send_device_state->threads_abort = false;
  33. }
  34. void multifd_device_state_send_cleanup(void)
  35. {
  36. g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
  37. g_clear_pointer(&multifd_send_device_state->send_data,
  38. multifd_send_data_free);
  39. qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex);
  40. g_clear_pointer(&multifd_send_device_state, g_free);
  41. }
  42. void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state)
  43. {
  44. g_clear_pointer(&device_state->idstr, g_free);
  45. g_clear_pointer(&device_state->buf, g_free);
  46. }
  47. static void multifd_device_state_fill_packet(MultiFDSendParams *p)
  48. {
  49. MultiFDDeviceState_t *device_state = &p->data->u.device_state;
  50. MultiFDPacketDeviceState_t *packet = p->packet_device_state;
  51. packet->hdr.flags = cpu_to_be32(p->flags);
  52. strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1);
  53. packet->idstr[sizeof(packet->idstr) - 1] = 0;
  54. packet->instance_id = cpu_to_be32(device_state->instance_id);
  55. packet->next_packet_size = cpu_to_be32(p->next_packet_size);
  56. }
  57. static void multifd_prepare_header_device_state(MultiFDSendParams *p)
  58. {
  59. p->iov[0].iov_len = sizeof(*p->packet_device_state);
  60. p->iov[0].iov_base = p->packet_device_state;
  61. p->iovs_num++;
  62. }
  63. void multifd_device_state_send_prepare(MultiFDSendParams *p)
  64. {
  65. MultiFDDeviceState_t *device_state = &p->data->u.device_state;
  66. assert(multifd_payload_device_state(p->data));
  67. multifd_prepare_header_device_state(p);
  68. assert(!(p->flags & MULTIFD_FLAG_SYNC));
  69. p->next_packet_size = device_state->buf_len;
  70. if (p->next_packet_size > 0) {
  71. p->iov[p->iovs_num].iov_base = device_state->buf;
  72. p->iov[p->iovs_num].iov_len = p->next_packet_size;
  73. p->iovs_num++;
  74. }
  75. p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE;
  76. multifd_device_state_fill_packet(p);
  77. }
  78. bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
  79. char *data, size_t len)
  80. {
  81. /* Device state submissions can come from multiple threads */
  82. QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex);
  83. MultiFDDeviceState_t *device_state;
  84. assert(multifd_payload_empty(multifd_send_device_state->send_data));
  85. multifd_set_payload_type(multifd_send_device_state->send_data,
  86. MULTIFD_PAYLOAD_DEVICE_STATE);
  87. device_state = &multifd_send_device_state->send_data->u.device_state;
  88. device_state->idstr = g_strdup(idstr);
  89. device_state->instance_id = instance_id;
  90. device_state->buf = g_memdup2(data, len);
  91. device_state->buf_len = len;
  92. if (!multifd_send(&multifd_send_device_state->send_data)) {
  93. multifd_send_data_clear(multifd_send_device_state->send_data);
  94. return false;
  95. }
  96. return true;
  97. }
  98. bool multifd_device_state_supported(void)
  99. {
  100. return migrate_multifd() && !migrate_mapped_ram() &&
  101. migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
  102. }
  103. static void multifd_device_state_save_thread_data_free(void *opaque)
  104. {
  105. SaveLiveCompletePrecopyThreadData *data = opaque;
  106. g_clear_pointer(&data->idstr, g_free);
  107. g_free(data);
  108. }
  109. static int multifd_device_state_save_thread(void *opaque)
  110. {
  111. SaveLiveCompletePrecopyThreadData *data = opaque;
  112. g_autoptr(Error) local_err = NULL;
  113. if (!data->hdlr(data, &local_err)) {
  114. MigrationState *s = migrate_get_current();
  115. /*
  116. * Can't call abort_device_state_save_threads() here since new
  117. * save threads could still be in process of being launched
  118. * (if, for example, the very first save thread launched exited
  119. * with an error very quickly).
  120. */
  121. assert(local_err);
  122. /*
  123. * In case of multiple save threads failing which thread error
  124. * return we end setting is purely arbitrary.
  125. */
  126. migrate_set_error(s, local_err);
  127. }
  128. return 0;
  129. }
  130. bool multifd_device_state_save_thread_should_exit(void)
  131. {
  132. return qatomic_read(&multifd_send_device_state->threads_abort);
  133. }
  134. void
  135. multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
  136. char *idstr, uint32_t instance_id,
  137. void *opaque)
  138. {
  139. SaveLiveCompletePrecopyThreadData *data;
  140. assert(multifd_device_state_supported());
  141. assert(multifd_send_device_state);
  142. assert(!qatomic_read(&multifd_send_device_state->threads_abort));
  143. data = g_new(SaveLiveCompletePrecopyThreadData, 1);
  144. data->hdlr = hdlr;
  145. data->idstr = g_strdup(idstr);
  146. data->instance_id = instance_id;
  147. data->handler_opaque = opaque;
  148. thread_pool_submit_immediate(multifd_send_device_state->threads,
  149. multifd_device_state_save_thread,
  150. data,
  151. multifd_device_state_save_thread_data_free);
  152. }
  153. void multifd_abort_device_state_save_threads(void)
  154. {
  155. assert(multifd_device_state_supported());
  156. qatomic_set(&multifd_send_device_state->threads_abort, true);
  157. }
  158. bool multifd_join_device_state_save_threads(void)
  159. {
  160. MigrationState *s = migrate_get_current();
  161. assert(multifd_device_state_supported());
  162. thread_pool_wait(multifd_send_device_state->threads);
  163. return !migrate_has_error(s);
  164. }