multifd.c 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. /*
  2. * Multifd common code
  3. *
  4. * Copyright (c) 2019-2020 Red Hat Inc
  5. *
  6. * Authors:
  7. * Juan Quintela <quintela@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include "qemu/rcu.h"
  14. #include "exec/target_page.h"
  15. #include "sysemu/sysemu.h"
  16. #include "exec/ramblock.h"
  17. #include "qemu/error-report.h"
  18. #include "qapi/error.h"
  19. #include "ram.h"
  20. #include "migration.h"
  21. #include "socket.h"
  22. #include "qemu-file.h"
  23. #include "trace.h"
  24. #include "multifd.h"
  25. /* Multiple fd's */
  26. #define MULTIFD_MAGIC 0x11223344U
  27. #define MULTIFD_VERSION 1
  28. typedef struct {
  29. uint32_t magic;
  30. uint32_t version;
  31. unsigned char uuid[16]; /* QemuUUID */
  32. uint8_t id;
  33. uint8_t unused1[7]; /* Reserved for future use */
  34. uint64_t unused2[4]; /* Reserved for future use */
  35. } __attribute__((packed)) MultiFDInit_t;
  36. /* Multifd without compression */
  37. /**
  38. * nocomp_send_setup: setup send side
  39. *
  40. * For no compression this function does nothing.
  41. *
  42. * Returns 0 for success or -1 for error
  43. *
  44. * @p: Params for the channel that we are using
  45. * @errp: pointer to an error
  46. */
  47. static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
  48. {
  49. return 0;
  50. }
  51. /**
  52. * nocomp_send_cleanup: cleanup send side
  53. *
  54. * For no compression this function does nothing.
  55. *
  56. * @p: Params for the channel that we are using
  57. */
  58. static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
  59. {
  60. return;
  61. }
  62. /**
  63. * nocomp_send_prepare: prepare date to be able to send
  64. *
  65. * For no compression we just have to calculate the size of the
  66. * packet.
  67. *
  68. * Returns 0 for success or -1 for error
  69. *
  70. * @p: Params for the channel that we are using
  71. * @used: number of pages used
  72. * @errp: pointer to an error
  73. */
  74. static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
  75. Error **errp)
  76. {
  77. p->next_packet_size = used * qemu_target_page_size();
  78. p->flags |= MULTIFD_FLAG_NOCOMP;
  79. return 0;
  80. }
  81. /**
  82. * nocomp_send_write: do the actual write of the data
  83. *
  84. * For no compression we just have to write the data.
  85. *
  86. * Returns 0 for success or -1 for error
  87. *
  88. * @p: Params for the channel that we are using
  89. * @used: number of pages used
  90. * @errp: pointer to an error
  91. */
  92. static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
  93. {
  94. return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
  95. }
  96. /**
  97. * nocomp_recv_setup: setup receive side
  98. *
  99. * For no compression this function does nothing.
  100. *
  101. * Returns 0 for success or -1 for error
  102. *
  103. * @p: Params for the channel that we are using
  104. * @errp: pointer to an error
  105. */
  106. static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
  107. {
  108. return 0;
  109. }
  110. /**
  111. * nocomp_recv_cleanup: setup receive side
  112. *
  113. * For no compression this function does nothing.
  114. *
  115. * @p: Params for the channel that we are using
  116. */
  117. static void nocomp_recv_cleanup(MultiFDRecvParams *p)
  118. {
  119. }
  120. /**
  121. * nocomp_recv_pages: read the data from the channel into actual pages
  122. *
  123. * For no compression we just need to read things into the correct place.
  124. *
  125. * Returns 0 for success or -1 for error
  126. *
  127. * @p: Params for the channel that we are using
  128. * @used: number of pages used
  129. * @errp: pointer to an error
  130. */
  131. static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
  132. {
  133. uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
  134. if (flags != MULTIFD_FLAG_NOCOMP) {
  135. error_setg(errp, "multifd %d: flags received %x flags expected %x",
  136. p->id, flags, MULTIFD_FLAG_NOCOMP);
  137. return -1;
  138. }
  139. return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
  140. }
  141. static MultiFDMethods multifd_nocomp_ops = {
  142. .send_setup = nocomp_send_setup,
  143. .send_cleanup = nocomp_send_cleanup,
  144. .send_prepare = nocomp_send_prepare,
  145. .send_write = nocomp_send_write,
  146. .recv_setup = nocomp_recv_setup,
  147. .recv_cleanup = nocomp_recv_cleanup,
  148. .recv_pages = nocomp_recv_pages
  149. };
  150. static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
  151. [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
  152. };
  153. void multifd_register_ops(int method, MultiFDMethods *ops)
  154. {
  155. assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
  156. multifd_ops[method] = ops;
  157. }
  158. static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
  159. {
  160. MultiFDInit_t msg = {};
  161. int ret;
  162. msg.magic = cpu_to_be32(MULTIFD_MAGIC);
  163. msg.version = cpu_to_be32(MULTIFD_VERSION);
  164. msg.id = p->id;
  165. memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
  166. ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
  167. if (ret != 0) {
  168. return -1;
  169. }
  170. return 0;
  171. }
  172. static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
  173. {
  174. MultiFDInit_t msg;
  175. int ret;
  176. ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
  177. if (ret != 0) {
  178. return -1;
  179. }
  180. msg.magic = be32_to_cpu(msg.magic);
  181. msg.version = be32_to_cpu(msg.version);
  182. if (msg.magic != MULTIFD_MAGIC) {
  183. error_setg(errp, "multifd: received packet magic %x "
  184. "expected %x", msg.magic, MULTIFD_MAGIC);
  185. return -1;
  186. }
  187. if (msg.version != MULTIFD_VERSION) {
  188. error_setg(errp, "multifd: received packet version %d "
  189. "expected %d", msg.version, MULTIFD_VERSION);
  190. return -1;
  191. }
  192. if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
  193. char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
  194. char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
  195. error_setg(errp, "multifd: received uuid '%s' and expected "
  196. "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
  197. g_free(uuid);
  198. g_free(msg_uuid);
  199. return -1;
  200. }
  201. if (msg.id > migrate_multifd_channels()) {
  202. error_setg(errp, "multifd: received channel version %d "
  203. "expected %d", msg.version, MULTIFD_VERSION);
  204. return -1;
  205. }
  206. return msg.id;
  207. }
  208. static MultiFDPages_t *multifd_pages_init(size_t size)
  209. {
  210. MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
  211. pages->allocated = size;
  212. pages->iov = g_new0(struct iovec, size);
  213. pages->offset = g_new0(ram_addr_t, size);
  214. return pages;
  215. }
  216. static void multifd_pages_clear(MultiFDPages_t *pages)
  217. {
  218. pages->used = 0;
  219. pages->allocated = 0;
  220. pages->packet_num = 0;
  221. pages->block = NULL;
  222. g_free(pages->iov);
  223. pages->iov = NULL;
  224. g_free(pages->offset);
  225. pages->offset = NULL;
  226. g_free(pages);
  227. }
  228. static void multifd_send_fill_packet(MultiFDSendParams *p)
  229. {
  230. MultiFDPacket_t *packet = p->packet;
  231. int i;
  232. packet->flags = cpu_to_be32(p->flags);
  233. packet->pages_alloc = cpu_to_be32(p->pages->allocated);
  234. packet->pages_used = cpu_to_be32(p->pages->used);
  235. packet->next_packet_size = cpu_to_be32(p->next_packet_size);
  236. packet->packet_num = cpu_to_be64(p->packet_num);
  237. if (p->pages->block) {
  238. strncpy(packet->ramblock, p->pages->block->idstr, 256);
  239. }
  240. for (i = 0; i < p->pages->used; i++) {
  241. /* there are architectures where ram_addr_t is 32 bit */
  242. uint64_t temp = p->pages->offset[i];
  243. packet->offset[i] = cpu_to_be64(temp);
  244. }
  245. }
  246. static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
  247. {
  248. MultiFDPacket_t *packet = p->packet;
  249. uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  250. RAMBlock *block;
  251. int i;
  252. packet->magic = be32_to_cpu(packet->magic);
  253. if (packet->magic != MULTIFD_MAGIC) {
  254. error_setg(errp, "multifd: received packet "
  255. "magic %x and expected magic %x",
  256. packet->magic, MULTIFD_MAGIC);
  257. return -1;
  258. }
  259. packet->version = be32_to_cpu(packet->version);
  260. if (packet->version != MULTIFD_VERSION) {
  261. error_setg(errp, "multifd: received packet "
  262. "version %d and expected version %d",
  263. packet->version, MULTIFD_VERSION);
  264. return -1;
  265. }
  266. p->flags = be32_to_cpu(packet->flags);
  267. packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
  268. /*
  269. * If we received a packet that is 100 times bigger than expected
  270. * just stop migration. It is a magic number.
  271. */
  272. if (packet->pages_alloc > pages_max * 100) {
  273. error_setg(errp, "multifd: received packet "
  274. "with size %d and expected a maximum size of %d",
  275. packet->pages_alloc, pages_max * 100) ;
  276. return -1;
  277. }
  278. /*
  279. * We received a packet that is bigger than expected but inside
  280. * reasonable limits (see previous comment). Just reallocate.
  281. */
  282. if (packet->pages_alloc > p->pages->allocated) {
  283. multifd_pages_clear(p->pages);
  284. p->pages = multifd_pages_init(packet->pages_alloc);
  285. }
  286. p->pages->used = be32_to_cpu(packet->pages_used);
  287. if (p->pages->used > packet->pages_alloc) {
  288. error_setg(errp, "multifd: received packet "
  289. "with %d pages and expected maximum pages are %d",
  290. p->pages->used, packet->pages_alloc) ;
  291. return -1;
  292. }
  293. p->next_packet_size = be32_to_cpu(packet->next_packet_size);
  294. p->packet_num = be64_to_cpu(packet->packet_num);
  295. if (p->pages->used == 0) {
  296. return 0;
  297. }
  298. /* make sure that ramblock is 0 terminated */
  299. packet->ramblock[255] = 0;
  300. block = qemu_ram_block_by_name(packet->ramblock);
  301. if (!block) {
  302. error_setg(errp, "multifd: unknown ram block %s",
  303. packet->ramblock);
  304. return -1;
  305. }
  306. for (i = 0; i < p->pages->used; i++) {
  307. uint64_t offset = be64_to_cpu(packet->offset[i]);
  308. if (offset > (block->used_length - qemu_target_page_size())) {
  309. error_setg(errp, "multifd: offset too long %" PRIu64
  310. " (max " RAM_ADDR_FMT ")",
  311. offset, block->max_length);
  312. return -1;
  313. }
  314. p->pages->iov[i].iov_base = block->host + offset;
  315. p->pages->iov[i].iov_len = qemu_target_page_size();
  316. }
  317. return 0;
  318. }
  319. struct {
  320. MultiFDSendParams *params;
  321. /* array of pages to sent */
  322. MultiFDPages_t *pages;
  323. /* global number of generated multifd packets */
  324. uint64_t packet_num;
  325. /* send channels ready */
  326. QemuSemaphore channels_ready;
  327. /*
  328. * Have we already run terminate threads. There is a race when it
  329. * happens that we got one error while we are exiting.
  330. * We will use atomic operations. Only valid values are 0 and 1.
  331. */
  332. int exiting;
  333. /* multifd ops */
  334. MultiFDMethods *ops;
  335. } *multifd_send_state;
  336. /*
  337. * How we use multifd_send_state->pages and channel->pages?
  338. *
  339. * We create a pages for each channel, and a main one. Each time that
  340. * we need to send a batch of pages we interchange the ones between
  341. * multifd_send_state and the channel that is sending it. There are
  342. * two reasons for that:
  343. * - to not have to do so many mallocs during migration
  344. * - to make easier to know what to free at the end of migration
  345. *
  346. * This way we always know who is the owner of each "pages" struct,
  347. * and we don't need any locking. It belongs to the migration thread
  348. * or to the channel thread. Switching is safe because the migration
  349. * thread is using the channel mutex when changing it, and the channel
  350. * have to had finish with its own, otherwise pending_job can't be
  351. * false.
  352. */
  353. static int multifd_send_pages(QEMUFile *f)
  354. {
  355. int i;
  356. static int next_channel;
  357. MultiFDSendParams *p = NULL; /* make happy gcc */
  358. MultiFDPages_t *pages = multifd_send_state->pages;
  359. uint64_t transferred;
  360. if (atomic_read(&multifd_send_state->exiting)) {
  361. return -1;
  362. }
  363. qemu_sem_wait(&multifd_send_state->channels_ready);
  364. /*
  365. * next_channel can remain from a previous migration that was
  366. * using more channels, so ensure it doesn't overflow if the
  367. * limit is lower now.
  368. */
  369. next_channel %= migrate_multifd_channels();
  370. for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
  371. p = &multifd_send_state->params[i];
  372. qemu_mutex_lock(&p->mutex);
  373. if (p->quit) {
  374. error_report("%s: channel %d has already quit!", __func__, i);
  375. qemu_mutex_unlock(&p->mutex);
  376. return -1;
  377. }
  378. if (!p->pending_job) {
  379. p->pending_job++;
  380. next_channel = (i + 1) % migrate_multifd_channels();
  381. break;
  382. }
  383. qemu_mutex_unlock(&p->mutex);
  384. }
  385. assert(!p->pages->used);
  386. assert(!p->pages->block);
  387. p->packet_num = multifd_send_state->packet_num++;
  388. multifd_send_state->pages = p->pages;
  389. p->pages = pages;
  390. transferred = ((uint64_t) pages->used) * qemu_target_page_size()
  391. + p->packet_len;
  392. qemu_file_update_transfer(f, transferred);
  393. ram_counters.multifd_bytes += transferred;
  394. ram_counters.transferred += transferred;;
  395. qemu_mutex_unlock(&p->mutex);
  396. qemu_sem_post(&p->sem);
  397. return 1;
  398. }
  399. int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
  400. {
  401. MultiFDPages_t *pages = multifd_send_state->pages;
  402. if (!pages->block) {
  403. pages->block = block;
  404. }
  405. if (pages->block == block) {
  406. pages->offset[pages->used] = offset;
  407. pages->iov[pages->used].iov_base = block->host + offset;
  408. pages->iov[pages->used].iov_len = qemu_target_page_size();
  409. pages->used++;
  410. if (pages->used < pages->allocated) {
  411. return 1;
  412. }
  413. }
  414. if (multifd_send_pages(f) < 0) {
  415. return -1;
  416. }
  417. if (pages->block != block) {
  418. return multifd_queue_page(f, block, offset);
  419. }
  420. return 1;
  421. }
  422. static void multifd_send_terminate_threads(Error *err)
  423. {
  424. int i;
  425. trace_multifd_send_terminate_threads(err != NULL);
  426. if (err) {
  427. MigrationState *s = migrate_get_current();
  428. migrate_set_error(s, err);
  429. if (s->state == MIGRATION_STATUS_SETUP ||
  430. s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
  431. s->state == MIGRATION_STATUS_DEVICE ||
  432. s->state == MIGRATION_STATUS_ACTIVE) {
  433. migrate_set_state(&s->state, s->state,
  434. MIGRATION_STATUS_FAILED);
  435. }
  436. }
  437. /*
  438. * We don't want to exit each threads twice. Depending on where
  439. * we get the error, or if there are two independent errors in two
  440. * threads at the same time, we can end calling this function
  441. * twice.
  442. */
  443. if (atomic_xchg(&multifd_send_state->exiting, 1)) {
  444. return;
  445. }
  446. for (i = 0; i < migrate_multifd_channels(); i++) {
  447. MultiFDSendParams *p = &multifd_send_state->params[i];
  448. qemu_mutex_lock(&p->mutex);
  449. p->quit = true;
  450. qemu_sem_post(&p->sem);
  451. qemu_mutex_unlock(&p->mutex);
  452. }
  453. }
  454. void multifd_save_cleanup(void)
  455. {
  456. int i;
  457. if (!migrate_use_multifd()) {
  458. return;
  459. }
  460. multifd_send_terminate_threads(NULL);
  461. for (i = 0; i < migrate_multifd_channels(); i++) {
  462. MultiFDSendParams *p = &multifd_send_state->params[i];
  463. if (p->running) {
  464. qemu_thread_join(&p->thread);
  465. }
  466. }
  467. for (i = 0; i < migrate_multifd_channels(); i++) {
  468. MultiFDSendParams *p = &multifd_send_state->params[i];
  469. Error *local_err = NULL;
  470. socket_send_channel_destroy(p->c);
  471. p->c = NULL;
  472. qemu_mutex_destroy(&p->mutex);
  473. qemu_sem_destroy(&p->sem);
  474. qemu_sem_destroy(&p->sem_sync);
  475. g_free(p->name);
  476. p->name = NULL;
  477. multifd_pages_clear(p->pages);
  478. p->pages = NULL;
  479. p->packet_len = 0;
  480. g_free(p->packet);
  481. p->packet = NULL;
  482. multifd_send_state->ops->send_cleanup(p, &local_err);
  483. if (local_err) {
  484. migrate_set_error(migrate_get_current(), local_err);
  485. error_free(local_err);
  486. }
  487. }
  488. qemu_sem_destroy(&multifd_send_state->channels_ready);
  489. g_free(multifd_send_state->params);
  490. multifd_send_state->params = NULL;
  491. multifd_pages_clear(multifd_send_state->pages);
  492. multifd_send_state->pages = NULL;
  493. g_free(multifd_send_state);
  494. multifd_send_state = NULL;
  495. }
  496. void multifd_send_sync_main(QEMUFile *f)
  497. {
  498. int i;
  499. if (!migrate_use_multifd()) {
  500. return;
  501. }
  502. if (multifd_send_state->pages->used) {
  503. if (multifd_send_pages(f) < 0) {
  504. error_report("%s: multifd_send_pages fail", __func__);
  505. return;
  506. }
  507. }
  508. for (i = 0; i < migrate_multifd_channels(); i++) {
  509. MultiFDSendParams *p = &multifd_send_state->params[i];
  510. trace_multifd_send_sync_main_signal(p->id);
  511. qemu_mutex_lock(&p->mutex);
  512. if (p->quit) {
  513. error_report("%s: channel %d has already quit", __func__, i);
  514. qemu_mutex_unlock(&p->mutex);
  515. return;
  516. }
  517. p->packet_num = multifd_send_state->packet_num++;
  518. p->flags |= MULTIFD_FLAG_SYNC;
  519. p->pending_job++;
  520. qemu_file_update_transfer(f, p->packet_len);
  521. ram_counters.multifd_bytes += p->packet_len;
  522. ram_counters.transferred += p->packet_len;
  523. qemu_mutex_unlock(&p->mutex);
  524. qemu_sem_post(&p->sem);
  525. }
  526. for (i = 0; i < migrate_multifd_channels(); i++) {
  527. MultiFDSendParams *p = &multifd_send_state->params[i];
  528. trace_multifd_send_sync_main_wait(p->id);
  529. qemu_sem_wait(&p->sem_sync);
  530. }
  531. trace_multifd_send_sync_main(multifd_send_state->packet_num);
  532. }
  533. static void *multifd_send_thread(void *opaque)
  534. {
  535. MultiFDSendParams *p = opaque;
  536. Error *local_err = NULL;
  537. int ret = 0;
  538. uint32_t flags = 0;
  539. trace_multifd_send_thread_start(p->id);
  540. rcu_register_thread();
  541. if (multifd_send_initial_packet(p, &local_err) < 0) {
  542. ret = -1;
  543. goto out;
  544. }
  545. /* initial packet */
  546. p->num_packets = 1;
  547. while (true) {
  548. qemu_sem_wait(&p->sem);
  549. if (atomic_read(&multifd_send_state->exiting)) {
  550. break;
  551. }
  552. qemu_mutex_lock(&p->mutex);
  553. if (p->pending_job) {
  554. uint32_t used = p->pages->used;
  555. uint64_t packet_num = p->packet_num;
  556. flags = p->flags;
  557. if (used) {
  558. ret = multifd_send_state->ops->send_prepare(p, used,
  559. &local_err);
  560. if (ret != 0) {
  561. qemu_mutex_unlock(&p->mutex);
  562. break;
  563. }
  564. }
  565. multifd_send_fill_packet(p);
  566. p->flags = 0;
  567. p->num_packets++;
  568. p->num_pages += used;
  569. p->pages->used = 0;
  570. p->pages->block = NULL;
  571. qemu_mutex_unlock(&p->mutex);
  572. trace_multifd_send(p->id, packet_num, used, flags,
  573. p->next_packet_size);
  574. ret = qio_channel_write_all(p->c, (void *)p->packet,
  575. p->packet_len, &local_err);
  576. if (ret != 0) {
  577. break;
  578. }
  579. if (used) {
  580. ret = multifd_send_state->ops->send_write(p, used, &local_err);
  581. if (ret != 0) {
  582. break;
  583. }
  584. }
  585. qemu_mutex_lock(&p->mutex);
  586. p->pending_job--;
  587. qemu_mutex_unlock(&p->mutex);
  588. if (flags & MULTIFD_FLAG_SYNC) {
  589. qemu_sem_post(&p->sem_sync);
  590. }
  591. qemu_sem_post(&multifd_send_state->channels_ready);
  592. } else if (p->quit) {
  593. qemu_mutex_unlock(&p->mutex);
  594. break;
  595. } else {
  596. qemu_mutex_unlock(&p->mutex);
  597. /* sometimes there are spurious wakeups */
  598. }
  599. }
  600. out:
  601. if (local_err) {
  602. trace_multifd_send_error(p->id);
  603. multifd_send_terminate_threads(local_err);
  604. error_free(local_err);
  605. }
  606. /*
  607. * Error happen, I will exit, but I can't just leave, tell
  608. * who pay attention to me.
  609. */
  610. if (ret != 0) {
  611. qemu_sem_post(&p->sem_sync);
  612. qemu_sem_post(&multifd_send_state->channels_ready);
  613. }
  614. qemu_mutex_lock(&p->mutex);
  615. p->running = false;
  616. qemu_mutex_unlock(&p->mutex);
  617. rcu_unregister_thread();
  618. trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
  619. return NULL;
  620. }
  621. static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
  622. {
  623. MultiFDSendParams *p = opaque;
  624. QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
  625. Error *local_err = NULL;
  626. trace_multifd_new_send_channel_async(p->id);
  627. if (qio_task_propagate_error(task, &local_err)) {
  628. migrate_set_error(migrate_get_current(), local_err);
  629. /* Error happen, we need to tell who pay attention to me */
  630. qemu_sem_post(&multifd_send_state->channels_ready);
  631. qemu_sem_post(&p->sem_sync);
  632. /*
  633. * Although multifd_send_thread is not created, but main migration
  634. * thread neet to judge whether it is running, so we need to mark
  635. * its status.
  636. */
  637. p->quit = true;
  638. object_unref(OBJECT(sioc));
  639. error_free(local_err);
  640. } else {
  641. p->c = QIO_CHANNEL(sioc);
  642. qio_channel_set_delay(p->c, false);
  643. p->running = true;
  644. qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
  645. QEMU_THREAD_JOINABLE);
  646. }
  647. }
  648. int multifd_save_setup(Error **errp)
  649. {
  650. int thread_count;
  651. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  652. uint8_t i;
  653. if (!migrate_use_multifd()) {
  654. return 0;
  655. }
  656. thread_count = migrate_multifd_channels();
  657. multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
  658. multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
  659. multifd_send_state->pages = multifd_pages_init(page_count);
  660. qemu_sem_init(&multifd_send_state->channels_ready, 0);
  661. atomic_set(&multifd_send_state->exiting, 0);
  662. multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
  663. for (i = 0; i < thread_count; i++) {
  664. MultiFDSendParams *p = &multifd_send_state->params[i];
  665. qemu_mutex_init(&p->mutex);
  666. qemu_sem_init(&p->sem, 0);
  667. qemu_sem_init(&p->sem_sync, 0);
  668. p->quit = false;
  669. p->pending_job = 0;
  670. p->id = i;
  671. p->pages = multifd_pages_init(page_count);
  672. p->packet_len = sizeof(MultiFDPacket_t)
  673. + sizeof(uint64_t) * page_count;
  674. p->packet = g_malloc0(p->packet_len);
  675. p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
  676. p->packet->version = cpu_to_be32(MULTIFD_VERSION);
  677. p->name = g_strdup_printf("multifdsend_%d", i);
  678. socket_send_channel_create(multifd_new_send_channel_async, p);
  679. }
  680. for (i = 0; i < thread_count; i++) {
  681. MultiFDSendParams *p = &multifd_send_state->params[i];
  682. Error *local_err = NULL;
  683. int ret;
  684. ret = multifd_send_state->ops->send_setup(p, &local_err);
  685. if (ret) {
  686. error_propagate(errp, local_err);
  687. return ret;
  688. }
  689. }
  690. return 0;
  691. }
  692. struct {
  693. MultiFDRecvParams *params;
  694. /* number of created threads */
  695. int count;
  696. /* syncs main thread and channels */
  697. QemuSemaphore sem_sync;
  698. /* global number of generated multifd packets */
  699. uint64_t packet_num;
  700. /* multifd ops */
  701. MultiFDMethods *ops;
  702. } *multifd_recv_state;
  703. static void multifd_recv_terminate_threads(Error *err)
  704. {
  705. int i;
  706. trace_multifd_recv_terminate_threads(err != NULL);
  707. if (err) {
  708. MigrationState *s = migrate_get_current();
  709. migrate_set_error(s, err);
  710. if (s->state == MIGRATION_STATUS_SETUP ||
  711. s->state == MIGRATION_STATUS_ACTIVE) {
  712. migrate_set_state(&s->state, s->state,
  713. MIGRATION_STATUS_FAILED);
  714. }
  715. }
  716. for (i = 0; i < migrate_multifd_channels(); i++) {
  717. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  718. qemu_mutex_lock(&p->mutex);
  719. p->quit = true;
  720. /*
  721. * We could arrive here for two reasons:
  722. * - normal quit, i.e. everything went fine, just finished
  723. * - error quit: We close the channels so the channel threads
  724. * finish the qio_channel_read_all_eof()
  725. */
  726. if (p->c) {
  727. qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
  728. }
  729. qemu_mutex_unlock(&p->mutex);
  730. }
  731. }
  732. int multifd_load_cleanup(Error **errp)
  733. {
  734. int i;
  735. if (!migrate_use_multifd()) {
  736. return 0;
  737. }
  738. multifd_recv_terminate_threads(NULL);
  739. for (i = 0; i < migrate_multifd_channels(); i++) {
  740. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  741. if (p->running) {
  742. p->quit = true;
  743. /*
  744. * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
  745. * however try to wakeup it without harm in cleanup phase.
  746. */
  747. qemu_sem_post(&p->sem_sync);
  748. qemu_thread_join(&p->thread);
  749. }
  750. }
  751. for (i = 0; i < migrate_multifd_channels(); i++) {
  752. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  753. object_unref(OBJECT(p->c));
  754. p->c = NULL;
  755. qemu_mutex_destroy(&p->mutex);
  756. qemu_sem_destroy(&p->sem_sync);
  757. g_free(p->name);
  758. p->name = NULL;
  759. multifd_pages_clear(p->pages);
  760. p->pages = NULL;
  761. p->packet_len = 0;
  762. g_free(p->packet);
  763. p->packet = NULL;
  764. multifd_recv_state->ops->recv_cleanup(p);
  765. }
  766. qemu_sem_destroy(&multifd_recv_state->sem_sync);
  767. g_free(multifd_recv_state->params);
  768. multifd_recv_state->params = NULL;
  769. g_free(multifd_recv_state);
  770. multifd_recv_state = NULL;
  771. return 0;
  772. }
  773. void multifd_recv_sync_main(void)
  774. {
  775. int i;
  776. if (!migrate_use_multifd()) {
  777. return;
  778. }
  779. for (i = 0; i < migrate_multifd_channels(); i++) {
  780. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  781. trace_multifd_recv_sync_main_wait(p->id);
  782. qemu_sem_wait(&multifd_recv_state->sem_sync);
  783. }
  784. for (i = 0; i < migrate_multifd_channels(); i++) {
  785. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  786. WITH_QEMU_LOCK_GUARD(&p->mutex) {
  787. if (multifd_recv_state->packet_num < p->packet_num) {
  788. multifd_recv_state->packet_num = p->packet_num;
  789. }
  790. }
  791. trace_multifd_recv_sync_main_signal(p->id);
  792. qemu_sem_post(&p->sem_sync);
  793. }
  794. trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
  795. }
  796. static void *multifd_recv_thread(void *opaque)
  797. {
  798. MultiFDRecvParams *p = opaque;
  799. Error *local_err = NULL;
  800. int ret;
  801. trace_multifd_recv_thread_start(p->id);
  802. rcu_register_thread();
  803. while (true) {
  804. uint32_t used;
  805. uint32_t flags;
  806. if (p->quit) {
  807. break;
  808. }
  809. ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
  810. p->packet_len, &local_err);
  811. if (ret == 0) { /* EOF */
  812. break;
  813. }
  814. if (ret == -1) { /* Error */
  815. break;
  816. }
  817. qemu_mutex_lock(&p->mutex);
  818. ret = multifd_recv_unfill_packet(p, &local_err);
  819. if (ret) {
  820. qemu_mutex_unlock(&p->mutex);
  821. break;
  822. }
  823. used = p->pages->used;
  824. flags = p->flags;
  825. /* recv methods don't know how to handle the SYNC flag */
  826. p->flags &= ~MULTIFD_FLAG_SYNC;
  827. trace_multifd_recv(p->id, p->packet_num, used, flags,
  828. p->next_packet_size);
  829. p->num_packets++;
  830. p->num_pages += used;
  831. qemu_mutex_unlock(&p->mutex);
  832. if (used) {
  833. ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
  834. if (ret != 0) {
  835. break;
  836. }
  837. }
  838. if (flags & MULTIFD_FLAG_SYNC) {
  839. qemu_sem_post(&multifd_recv_state->sem_sync);
  840. qemu_sem_wait(&p->sem_sync);
  841. }
  842. }
  843. if (local_err) {
  844. multifd_recv_terminate_threads(local_err);
  845. error_free(local_err);
  846. }
  847. qemu_mutex_lock(&p->mutex);
  848. p->running = false;
  849. qemu_mutex_unlock(&p->mutex);
  850. rcu_unregister_thread();
  851. trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
  852. return NULL;
  853. }
  854. int multifd_load_setup(Error **errp)
  855. {
  856. int thread_count;
  857. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  858. uint8_t i;
  859. if (!migrate_use_multifd()) {
  860. return 0;
  861. }
  862. thread_count = migrate_multifd_channels();
  863. multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
  864. multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
  865. atomic_set(&multifd_recv_state->count, 0);
  866. qemu_sem_init(&multifd_recv_state->sem_sync, 0);
  867. multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
  868. for (i = 0; i < thread_count; i++) {
  869. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  870. qemu_mutex_init(&p->mutex);
  871. qemu_sem_init(&p->sem_sync, 0);
  872. p->quit = false;
  873. p->id = i;
  874. p->pages = multifd_pages_init(page_count);
  875. p->packet_len = sizeof(MultiFDPacket_t)
  876. + sizeof(uint64_t) * page_count;
  877. p->packet = g_malloc0(p->packet_len);
  878. p->name = g_strdup_printf("multifdrecv_%d", i);
  879. }
  880. for (i = 0; i < thread_count; i++) {
  881. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  882. Error *local_err = NULL;
  883. int ret;
  884. ret = multifd_recv_state->ops->recv_setup(p, &local_err);
  885. if (ret) {
  886. error_propagate(errp, local_err);
  887. return ret;
  888. }
  889. }
  890. return 0;
  891. }
  892. bool multifd_recv_all_channels_created(void)
  893. {
  894. int thread_count = migrate_multifd_channels();
  895. if (!migrate_use_multifd()) {
  896. return true;
  897. }
  898. return thread_count == atomic_read(&multifd_recv_state->count);
  899. }
  900. /*
  901. * Try to receive all multifd channels to get ready for the migration.
  902. * - Return true and do not set @errp when correctly receving all channels;
  903. * - Return false and do not set @errp when correctly receiving the current one;
  904. * - Return false and set @errp when failing to receive the current channel.
  905. */
  906. bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
  907. {
  908. MultiFDRecvParams *p;
  909. Error *local_err = NULL;
  910. int id;
  911. id = multifd_recv_initial_packet(ioc, &local_err);
  912. if (id < 0) {
  913. multifd_recv_terminate_threads(local_err);
  914. error_propagate_prepend(errp, local_err,
  915. "failed to receive packet"
  916. " via multifd channel %d: ",
  917. atomic_read(&multifd_recv_state->count));
  918. return false;
  919. }
  920. trace_multifd_recv_new_channel(id);
  921. p = &multifd_recv_state->params[id];
  922. if (p->c != NULL) {
  923. error_setg(&local_err, "multifd: received id '%d' already setup'",
  924. id);
  925. multifd_recv_terminate_threads(local_err);
  926. error_propagate(errp, local_err);
  927. return false;
  928. }
  929. p->c = ioc;
  930. object_ref(OBJECT(ioc));
  931. /* initial packet */
  932. p->num_packets = 1;
  933. p->running = true;
  934. qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
  935. QEMU_THREAD_JOINABLE);
  936. atomic_inc(&multifd_recv_state->count);
  937. return atomic_read(&multifd_recv_state->count) ==
  938. migrate_multifd_channels();
  939. }