multifd.c 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181
  1. /*
  2. * Multifd common code
  3. *
  4. * Copyright (c) 2019-2020 Red Hat Inc
  5. *
  6. * Authors:
  7. * Juan Quintela <quintela@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include "qemu/rcu.h"
  14. #include "exec/target_page.h"
  15. #include "sysemu/sysemu.h"
  16. #include "exec/ramblock.h"
  17. #include "qemu/error-report.h"
  18. #include "qapi/error.h"
  19. #include "ram.h"
  20. #include "migration.h"
  21. #include "socket.h"
  22. #include "tls.h"
  23. #include "qemu-file.h"
  24. #include "trace.h"
  25. #include "multifd.h"
  26. /* Multiple fd's */
  27. #define MULTIFD_MAGIC 0x11223344U
  28. #define MULTIFD_VERSION 1
  29. typedef struct {
  30. uint32_t magic;
  31. uint32_t version;
  32. unsigned char uuid[16]; /* QemuUUID */
  33. uint8_t id;
  34. uint8_t unused1[7]; /* Reserved for future use */
  35. uint64_t unused2[4]; /* Reserved for future use */
  36. } __attribute__((packed)) MultiFDInit_t;
  37. /* Multifd without compression */
  38. /**
  39. * nocomp_send_setup: setup send side
  40. *
  41. * For no compression this function does nothing.
  42. *
  43. * Returns 0 for success or -1 for error
  44. *
  45. * @p: Params for the channel that we are using
  46. * @errp: pointer to an error
  47. */
  48. static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
  49. {
  50. return 0;
  51. }
  52. /**
  53. * nocomp_send_cleanup: cleanup send side
  54. *
  55. * For no compression this function does nothing.
  56. *
  57. * @p: Params for the channel that we are using
  58. */
  59. static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
  60. {
  61. return;
  62. }
  63. /**
  64. * nocomp_send_prepare: prepare date to be able to send
  65. *
  66. * For no compression we just have to calculate the size of the
  67. * packet.
  68. *
  69. * Returns 0 for success or -1 for error
  70. *
  71. * @p: Params for the channel that we are using
  72. * @used: number of pages used
  73. * @errp: pointer to an error
  74. */
  75. static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
  76. Error **errp)
  77. {
  78. p->next_packet_size = used * qemu_target_page_size();
  79. p->flags |= MULTIFD_FLAG_NOCOMP;
  80. return 0;
  81. }
  82. /**
  83. * nocomp_send_write: do the actual write of the data
  84. *
  85. * For no compression we just have to write the data.
  86. *
  87. * Returns 0 for success or -1 for error
  88. *
  89. * @p: Params for the channel that we are using
  90. * @used: number of pages used
  91. * @errp: pointer to an error
  92. */
  93. static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
  94. {
  95. return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
  96. }
  97. /**
  98. * nocomp_recv_setup: setup receive side
  99. *
  100. * For no compression this function does nothing.
  101. *
  102. * Returns 0 for success or -1 for error
  103. *
  104. * @p: Params for the channel that we are using
  105. * @errp: pointer to an error
  106. */
  107. static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
  108. {
  109. return 0;
  110. }
  111. /**
  112. * nocomp_recv_cleanup: setup receive side
  113. *
  114. * For no compression this function does nothing.
  115. *
  116. * @p: Params for the channel that we are using
  117. */
  118. static void nocomp_recv_cleanup(MultiFDRecvParams *p)
  119. {
  120. }
  121. /**
  122. * nocomp_recv_pages: read the data from the channel into actual pages
  123. *
  124. * For no compression we just need to read things into the correct place.
  125. *
  126. * Returns 0 for success or -1 for error
  127. *
  128. * @p: Params for the channel that we are using
  129. * @used: number of pages used
  130. * @errp: pointer to an error
  131. */
  132. static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
  133. {
  134. uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
  135. if (flags != MULTIFD_FLAG_NOCOMP) {
  136. error_setg(errp, "multifd %d: flags received %x flags expected %x",
  137. p->id, flags, MULTIFD_FLAG_NOCOMP);
  138. return -1;
  139. }
  140. return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
  141. }
  142. static MultiFDMethods multifd_nocomp_ops = {
  143. .send_setup = nocomp_send_setup,
  144. .send_cleanup = nocomp_send_cleanup,
  145. .send_prepare = nocomp_send_prepare,
  146. .send_write = nocomp_send_write,
  147. .recv_setup = nocomp_recv_setup,
  148. .recv_cleanup = nocomp_recv_cleanup,
  149. .recv_pages = nocomp_recv_pages
  150. };
  151. static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
  152. [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
  153. };
  154. void multifd_register_ops(int method, MultiFDMethods *ops)
  155. {
  156. assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
  157. multifd_ops[method] = ops;
  158. }
  159. static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
  160. {
  161. MultiFDInit_t msg = {};
  162. int ret;
  163. msg.magic = cpu_to_be32(MULTIFD_MAGIC);
  164. msg.version = cpu_to_be32(MULTIFD_VERSION);
  165. msg.id = p->id;
  166. memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
  167. ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
  168. if (ret != 0) {
  169. return -1;
  170. }
  171. return 0;
  172. }
  173. static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
  174. {
  175. MultiFDInit_t msg;
  176. int ret;
  177. ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
  178. if (ret != 0) {
  179. return -1;
  180. }
  181. msg.magic = be32_to_cpu(msg.magic);
  182. msg.version = be32_to_cpu(msg.version);
  183. if (msg.magic != MULTIFD_MAGIC) {
  184. error_setg(errp, "multifd: received packet magic %x "
  185. "expected %x", msg.magic, MULTIFD_MAGIC);
  186. return -1;
  187. }
  188. if (msg.version != MULTIFD_VERSION) {
  189. error_setg(errp, "multifd: received packet version %d "
  190. "expected %d", msg.version, MULTIFD_VERSION);
  191. return -1;
  192. }
  193. if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
  194. char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
  195. char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
  196. error_setg(errp, "multifd: received uuid '%s' and expected "
  197. "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
  198. g_free(uuid);
  199. g_free(msg_uuid);
  200. return -1;
  201. }
  202. if (msg.id > migrate_multifd_channels()) {
  203. error_setg(errp, "multifd: received channel version %d "
  204. "expected %d", msg.version, MULTIFD_VERSION);
  205. return -1;
  206. }
  207. return msg.id;
  208. }
  209. static MultiFDPages_t *multifd_pages_init(size_t size)
  210. {
  211. MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
  212. pages->allocated = size;
  213. pages->iov = g_new0(struct iovec, size);
  214. pages->offset = g_new0(ram_addr_t, size);
  215. return pages;
  216. }
  217. static void multifd_pages_clear(MultiFDPages_t *pages)
  218. {
  219. pages->used = 0;
  220. pages->allocated = 0;
  221. pages->packet_num = 0;
  222. pages->block = NULL;
  223. g_free(pages->iov);
  224. pages->iov = NULL;
  225. g_free(pages->offset);
  226. pages->offset = NULL;
  227. g_free(pages);
  228. }
  229. static void multifd_send_fill_packet(MultiFDSendParams *p)
  230. {
  231. MultiFDPacket_t *packet = p->packet;
  232. int i;
  233. packet->flags = cpu_to_be32(p->flags);
  234. packet->pages_alloc = cpu_to_be32(p->pages->allocated);
  235. packet->pages_used = cpu_to_be32(p->pages->used);
  236. packet->next_packet_size = cpu_to_be32(p->next_packet_size);
  237. packet->packet_num = cpu_to_be64(p->packet_num);
  238. if (p->pages->block) {
  239. strncpy(packet->ramblock, p->pages->block->idstr, 256);
  240. }
  241. for (i = 0; i < p->pages->used; i++) {
  242. /* there are architectures where ram_addr_t is 32 bit */
  243. uint64_t temp = p->pages->offset[i];
  244. packet->offset[i] = cpu_to_be64(temp);
  245. }
  246. }
  247. static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
  248. {
  249. MultiFDPacket_t *packet = p->packet;
  250. uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  251. RAMBlock *block;
  252. int i;
  253. packet->magic = be32_to_cpu(packet->magic);
  254. if (packet->magic != MULTIFD_MAGIC) {
  255. error_setg(errp, "multifd: received packet "
  256. "magic %x and expected magic %x",
  257. packet->magic, MULTIFD_MAGIC);
  258. return -1;
  259. }
  260. packet->version = be32_to_cpu(packet->version);
  261. if (packet->version != MULTIFD_VERSION) {
  262. error_setg(errp, "multifd: received packet "
  263. "version %d and expected version %d",
  264. packet->version, MULTIFD_VERSION);
  265. return -1;
  266. }
  267. p->flags = be32_to_cpu(packet->flags);
  268. packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
  269. /*
  270. * If we received a packet that is 100 times bigger than expected
  271. * just stop migration. It is a magic number.
  272. */
  273. if (packet->pages_alloc > pages_max * 100) {
  274. error_setg(errp, "multifd: received packet "
  275. "with size %d and expected a maximum size of %d",
  276. packet->pages_alloc, pages_max * 100) ;
  277. return -1;
  278. }
  279. /*
  280. * We received a packet that is bigger than expected but inside
  281. * reasonable limits (see previous comment). Just reallocate.
  282. */
  283. if (packet->pages_alloc > p->pages->allocated) {
  284. multifd_pages_clear(p->pages);
  285. p->pages = multifd_pages_init(packet->pages_alloc);
  286. }
  287. p->pages->used = be32_to_cpu(packet->pages_used);
  288. if (p->pages->used > packet->pages_alloc) {
  289. error_setg(errp, "multifd: received packet "
  290. "with %d pages and expected maximum pages are %d",
  291. p->pages->used, packet->pages_alloc) ;
  292. return -1;
  293. }
  294. p->next_packet_size = be32_to_cpu(packet->next_packet_size);
  295. p->packet_num = be64_to_cpu(packet->packet_num);
  296. if (p->pages->used == 0) {
  297. return 0;
  298. }
  299. /* make sure that ramblock is 0 terminated */
  300. packet->ramblock[255] = 0;
  301. block = qemu_ram_block_by_name(packet->ramblock);
  302. if (!block) {
  303. error_setg(errp, "multifd: unknown ram block %s",
  304. packet->ramblock);
  305. return -1;
  306. }
  307. for (i = 0; i < p->pages->used; i++) {
  308. uint64_t offset = be64_to_cpu(packet->offset[i]);
  309. if (offset > (block->used_length - qemu_target_page_size())) {
  310. error_setg(errp, "multifd: offset too long %" PRIu64
  311. " (max " RAM_ADDR_FMT ")",
  312. offset, block->max_length);
  313. return -1;
  314. }
  315. p->pages->iov[i].iov_base = block->host + offset;
  316. p->pages->iov[i].iov_len = qemu_target_page_size();
  317. }
  318. return 0;
  319. }
  320. struct {
  321. MultiFDSendParams *params;
  322. /* array of pages to sent */
  323. MultiFDPages_t *pages;
  324. /* global number of generated multifd packets */
  325. uint64_t packet_num;
  326. /* send channels ready */
  327. QemuSemaphore channels_ready;
  328. /*
  329. * Have we already run terminate threads. There is a race when it
  330. * happens that we got one error while we are exiting.
  331. * We will use atomic operations. Only valid values are 0 and 1.
  332. */
  333. int exiting;
  334. /* multifd ops */
  335. MultiFDMethods *ops;
  336. } *multifd_send_state;
  337. /*
  338. * How we use multifd_send_state->pages and channel->pages?
  339. *
  340. * We create a pages for each channel, and a main one. Each time that
  341. * we need to send a batch of pages we interchange the ones between
  342. * multifd_send_state and the channel that is sending it. There are
  343. * two reasons for that:
  344. * - to not have to do so many mallocs during migration
  345. * - to make easier to know what to free at the end of migration
  346. *
  347. * This way we always know who is the owner of each "pages" struct,
  348. * and we don't need any locking. It belongs to the migration thread
  349. * or to the channel thread. Switching is safe because the migration
  350. * thread is using the channel mutex when changing it, and the channel
  351. * have to had finish with its own, otherwise pending_job can't be
  352. * false.
  353. */
  354. static int multifd_send_pages(QEMUFile *f)
  355. {
  356. int i;
  357. static int next_channel;
  358. MultiFDSendParams *p = NULL; /* make happy gcc */
  359. MultiFDPages_t *pages = multifd_send_state->pages;
  360. uint64_t transferred;
  361. if (qatomic_read(&multifd_send_state->exiting)) {
  362. return -1;
  363. }
  364. qemu_sem_wait(&multifd_send_state->channels_ready);
  365. /*
  366. * next_channel can remain from a previous migration that was
  367. * using more channels, so ensure it doesn't overflow if the
  368. * limit is lower now.
  369. */
  370. next_channel %= migrate_multifd_channels();
  371. for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
  372. p = &multifd_send_state->params[i];
  373. qemu_mutex_lock(&p->mutex);
  374. if (p->quit) {
  375. error_report("%s: channel %d has already quit!", __func__, i);
  376. qemu_mutex_unlock(&p->mutex);
  377. return -1;
  378. }
  379. if (!p->pending_job) {
  380. p->pending_job++;
  381. next_channel = (i + 1) % migrate_multifd_channels();
  382. break;
  383. }
  384. qemu_mutex_unlock(&p->mutex);
  385. }
  386. assert(!p->pages->used);
  387. assert(!p->pages->block);
  388. p->packet_num = multifd_send_state->packet_num++;
  389. multifd_send_state->pages = p->pages;
  390. p->pages = pages;
  391. transferred = ((uint64_t) pages->used) * qemu_target_page_size()
  392. + p->packet_len;
  393. qemu_file_update_transfer(f, transferred);
  394. ram_counters.multifd_bytes += transferred;
  395. ram_counters.transferred += transferred;
  396. qemu_mutex_unlock(&p->mutex);
  397. qemu_sem_post(&p->sem);
  398. return 1;
  399. }
  400. int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
  401. {
  402. MultiFDPages_t *pages = multifd_send_state->pages;
  403. if (!pages->block) {
  404. pages->block = block;
  405. }
  406. if (pages->block == block) {
  407. pages->offset[pages->used] = offset;
  408. pages->iov[pages->used].iov_base = block->host + offset;
  409. pages->iov[pages->used].iov_len = qemu_target_page_size();
  410. pages->used++;
  411. if (pages->used < pages->allocated) {
  412. return 1;
  413. }
  414. }
  415. if (multifd_send_pages(f) < 0) {
  416. return -1;
  417. }
  418. if (pages->block != block) {
  419. return multifd_queue_page(f, block, offset);
  420. }
  421. return 1;
  422. }
  423. static void multifd_send_terminate_threads(Error *err)
  424. {
  425. int i;
  426. trace_multifd_send_terminate_threads(err != NULL);
  427. if (err) {
  428. MigrationState *s = migrate_get_current();
  429. migrate_set_error(s, err);
  430. if (s->state == MIGRATION_STATUS_SETUP ||
  431. s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
  432. s->state == MIGRATION_STATUS_DEVICE ||
  433. s->state == MIGRATION_STATUS_ACTIVE) {
  434. migrate_set_state(&s->state, s->state,
  435. MIGRATION_STATUS_FAILED);
  436. }
  437. }
  438. /*
  439. * We don't want to exit each threads twice. Depending on where
  440. * we get the error, or if there are two independent errors in two
  441. * threads at the same time, we can end calling this function
  442. * twice.
  443. */
  444. if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
  445. return;
  446. }
  447. for (i = 0; i < migrate_multifd_channels(); i++) {
  448. MultiFDSendParams *p = &multifd_send_state->params[i];
  449. qemu_mutex_lock(&p->mutex);
  450. p->quit = true;
  451. qemu_sem_post(&p->sem);
  452. qemu_mutex_unlock(&p->mutex);
  453. }
  454. }
  455. void multifd_save_cleanup(void)
  456. {
  457. int i;
  458. if (!migrate_use_multifd()) {
  459. return;
  460. }
  461. multifd_send_terminate_threads(NULL);
  462. for (i = 0; i < migrate_multifd_channels(); i++) {
  463. MultiFDSendParams *p = &multifd_send_state->params[i];
  464. if (p->running) {
  465. qemu_thread_join(&p->thread);
  466. }
  467. }
  468. for (i = 0; i < migrate_multifd_channels(); i++) {
  469. MultiFDSendParams *p = &multifd_send_state->params[i];
  470. Error *local_err = NULL;
  471. socket_send_channel_destroy(p->c);
  472. p->c = NULL;
  473. qemu_mutex_destroy(&p->mutex);
  474. qemu_sem_destroy(&p->sem);
  475. qemu_sem_destroy(&p->sem_sync);
  476. g_free(p->name);
  477. p->name = NULL;
  478. g_free(p->tls_hostname);
  479. p->tls_hostname = NULL;
  480. multifd_pages_clear(p->pages);
  481. p->pages = NULL;
  482. p->packet_len = 0;
  483. g_free(p->packet);
  484. p->packet = NULL;
  485. multifd_send_state->ops->send_cleanup(p, &local_err);
  486. if (local_err) {
  487. migrate_set_error(migrate_get_current(), local_err);
  488. error_free(local_err);
  489. }
  490. }
  491. qemu_sem_destroy(&multifd_send_state->channels_ready);
  492. g_free(multifd_send_state->params);
  493. multifd_send_state->params = NULL;
  494. multifd_pages_clear(multifd_send_state->pages);
  495. multifd_send_state->pages = NULL;
  496. g_free(multifd_send_state);
  497. multifd_send_state = NULL;
  498. }
  499. void multifd_send_sync_main(QEMUFile *f)
  500. {
  501. int i;
  502. if (!migrate_use_multifd()) {
  503. return;
  504. }
  505. if (multifd_send_state->pages->used) {
  506. if (multifd_send_pages(f) < 0) {
  507. error_report("%s: multifd_send_pages fail", __func__);
  508. return;
  509. }
  510. }
  511. for (i = 0; i < migrate_multifd_channels(); i++) {
  512. MultiFDSendParams *p = &multifd_send_state->params[i];
  513. trace_multifd_send_sync_main_signal(p->id);
  514. qemu_mutex_lock(&p->mutex);
  515. if (p->quit) {
  516. error_report("%s: channel %d has already quit", __func__, i);
  517. qemu_mutex_unlock(&p->mutex);
  518. return;
  519. }
  520. p->packet_num = multifd_send_state->packet_num++;
  521. p->flags |= MULTIFD_FLAG_SYNC;
  522. p->pending_job++;
  523. qemu_file_update_transfer(f, p->packet_len);
  524. ram_counters.multifd_bytes += p->packet_len;
  525. ram_counters.transferred += p->packet_len;
  526. qemu_mutex_unlock(&p->mutex);
  527. qemu_sem_post(&p->sem);
  528. }
  529. for (i = 0; i < migrate_multifd_channels(); i++) {
  530. MultiFDSendParams *p = &multifd_send_state->params[i];
  531. trace_multifd_send_sync_main_wait(p->id);
  532. qemu_sem_wait(&p->sem_sync);
  533. }
  534. trace_multifd_send_sync_main(multifd_send_state->packet_num);
  535. }
  536. static void *multifd_send_thread(void *opaque)
  537. {
  538. MultiFDSendParams *p = opaque;
  539. Error *local_err = NULL;
  540. int ret = 0;
  541. uint32_t flags = 0;
  542. trace_multifd_send_thread_start(p->id);
  543. rcu_register_thread();
  544. if (multifd_send_initial_packet(p, &local_err) < 0) {
  545. ret = -1;
  546. goto out;
  547. }
  548. /* initial packet */
  549. p->num_packets = 1;
  550. while (true) {
  551. qemu_sem_wait(&p->sem);
  552. if (qatomic_read(&multifd_send_state->exiting)) {
  553. break;
  554. }
  555. qemu_mutex_lock(&p->mutex);
  556. if (p->pending_job) {
  557. uint32_t used = p->pages->used;
  558. uint64_t packet_num = p->packet_num;
  559. flags = p->flags;
  560. if (used) {
  561. ret = multifd_send_state->ops->send_prepare(p, used,
  562. &local_err);
  563. if (ret != 0) {
  564. qemu_mutex_unlock(&p->mutex);
  565. break;
  566. }
  567. }
  568. multifd_send_fill_packet(p);
  569. p->flags = 0;
  570. p->num_packets++;
  571. p->num_pages += used;
  572. p->pages->used = 0;
  573. p->pages->block = NULL;
  574. qemu_mutex_unlock(&p->mutex);
  575. trace_multifd_send(p->id, packet_num, used, flags,
  576. p->next_packet_size);
  577. ret = qio_channel_write_all(p->c, (void *)p->packet,
  578. p->packet_len, &local_err);
  579. if (ret != 0) {
  580. break;
  581. }
  582. if (used) {
  583. ret = multifd_send_state->ops->send_write(p, used, &local_err);
  584. if (ret != 0) {
  585. break;
  586. }
  587. }
  588. qemu_mutex_lock(&p->mutex);
  589. p->pending_job--;
  590. qemu_mutex_unlock(&p->mutex);
  591. if (flags & MULTIFD_FLAG_SYNC) {
  592. qemu_sem_post(&p->sem_sync);
  593. }
  594. qemu_sem_post(&multifd_send_state->channels_ready);
  595. } else if (p->quit) {
  596. qemu_mutex_unlock(&p->mutex);
  597. break;
  598. } else {
  599. qemu_mutex_unlock(&p->mutex);
  600. /* sometimes there are spurious wakeups */
  601. }
  602. }
  603. out:
  604. if (local_err) {
  605. trace_multifd_send_error(p->id);
  606. multifd_send_terminate_threads(local_err);
  607. error_free(local_err);
  608. }
  609. /*
  610. * Error happen, I will exit, but I can't just leave, tell
  611. * who pay attention to me.
  612. */
  613. if (ret != 0) {
  614. qemu_sem_post(&p->sem_sync);
  615. qemu_sem_post(&multifd_send_state->channels_ready);
  616. }
  617. qemu_mutex_lock(&p->mutex);
  618. p->running = false;
  619. qemu_mutex_unlock(&p->mutex);
  620. rcu_unregister_thread();
  621. trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
  622. return NULL;
  623. }
  624. static bool multifd_channel_connect(MultiFDSendParams *p,
  625. QIOChannel *ioc,
  626. Error *error);
  627. static void multifd_tls_outgoing_handshake(QIOTask *task,
  628. gpointer opaque)
  629. {
  630. MultiFDSendParams *p = opaque;
  631. QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
  632. Error *err = NULL;
  633. if (qio_task_propagate_error(task, &err)) {
  634. trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
  635. } else {
  636. trace_multifd_tls_outgoing_handshake_complete(ioc);
  637. }
  638. multifd_channel_connect(p, ioc, err);
  639. }
  640. static void multifd_tls_channel_connect(MultiFDSendParams *p,
  641. QIOChannel *ioc,
  642. Error **errp)
  643. {
  644. MigrationState *s = migrate_get_current();
  645. const char *hostname = p->tls_hostname;
  646. QIOChannelTLS *tioc;
  647. tioc = migration_tls_client_create(s, ioc, hostname, errp);
  648. if (!tioc) {
  649. return;
  650. }
  651. trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
  652. qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
  653. qio_channel_tls_handshake(tioc,
  654. multifd_tls_outgoing_handshake,
  655. p,
  656. NULL,
  657. NULL);
  658. }
  659. static bool multifd_channel_connect(MultiFDSendParams *p,
  660. QIOChannel *ioc,
  661. Error *error)
  662. {
  663. MigrationState *s = migrate_get_current();
  664. trace_multifd_set_outgoing_channel(
  665. ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
  666. if (!error) {
  667. if (s->parameters.tls_creds &&
  668. *s->parameters.tls_creds &&
  669. !object_dynamic_cast(OBJECT(ioc),
  670. TYPE_QIO_CHANNEL_TLS)) {
  671. multifd_tls_channel_connect(p, ioc, &error);
  672. if (!error) {
  673. /*
  674. * tls_channel_connect will call back to this
  675. * function after the TLS handshake,
  676. * so we mustn't call multifd_send_thread until then
  677. */
  678. return false;
  679. } else {
  680. return true;
  681. }
  682. } else {
  683. /* update for tls qio channel */
  684. p->c = ioc;
  685. qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
  686. QEMU_THREAD_JOINABLE);
  687. }
  688. return false;
  689. }
  690. return true;
  691. }
  692. static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
  693. QIOChannel *ioc, Error *err)
  694. {
  695. migrate_set_error(migrate_get_current(), err);
  696. /* Error happen, we need to tell who pay attention to me */
  697. qemu_sem_post(&multifd_send_state->channels_ready);
  698. qemu_sem_post(&p->sem_sync);
  699. /*
  700. * Although multifd_send_thread is not created, but main migration
  701. * thread neet to judge whether it is running, so we need to mark
  702. * its status.
  703. */
  704. p->quit = true;
  705. object_unref(OBJECT(ioc));
  706. error_free(err);
  707. }
  708. static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
  709. {
  710. MultiFDSendParams *p = opaque;
  711. QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
  712. Error *local_err = NULL;
  713. trace_multifd_new_send_channel_async(p->id);
  714. if (qio_task_propagate_error(task, &local_err)) {
  715. goto cleanup;
  716. } else {
  717. p->c = QIO_CHANNEL(sioc);
  718. qio_channel_set_delay(p->c, false);
  719. p->running = true;
  720. if (multifd_channel_connect(p, sioc, local_err)) {
  721. goto cleanup;
  722. }
  723. return;
  724. }
  725. cleanup:
  726. multifd_new_send_channel_cleanup(p, sioc, local_err);
  727. }
  728. int multifd_save_setup(Error **errp)
  729. {
  730. int thread_count;
  731. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  732. uint8_t i;
  733. MigrationState *s;
  734. if (!migrate_use_multifd()) {
  735. return 0;
  736. }
  737. s = migrate_get_current();
  738. thread_count = migrate_multifd_channels();
  739. multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
  740. multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
  741. multifd_send_state->pages = multifd_pages_init(page_count);
  742. qemu_sem_init(&multifd_send_state->channels_ready, 0);
  743. qatomic_set(&multifd_send_state->exiting, 0);
  744. multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
  745. for (i = 0; i < thread_count; i++) {
  746. MultiFDSendParams *p = &multifd_send_state->params[i];
  747. qemu_mutex_init(&p->mutex);
  748. qemu_sem_init(&p->sem, 0);
  749. qemu_sem_init(&p->sem_sync, 0);
  750. p->quit = false;
  751. p->pending_job = 0;
  752. p->id = i;
  753. p->pages = multifd_pages_init(page_count);
  754. p->packet_len = sizeof(MultiFDPacket_t)
  755. + sizeof(uint64_t) * page_count;
  756. p->packet = g_malloc0(p->packet_len);
  757. p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
  758. p->packet->version = cpu_to_be32(MULTIFD_VERSION);
  759. p->name = g_strdup_printf("multifdsend_%d", i);
  760. p->tls_hostname = g_strdup(s->hostname);
  761. socket_send_channel_create(multifd_new_send_channel_async, p);
  762. }
  763. for (i = 0; i < thread_count; i++) {
  764. MultiFDSendParams *p = &multifd_send_state->params[i];
  765. Error *local_err = NULL;
  766. int ret;
  767. ret = multifd_send_state->ops->send_setup(p, &local_err);
  768. if (ret) {
  769. error_propagate(errp, local_err);
  770. return ret;
  771. }
  772. }
  773. return 0;
  774. }
  775. struct {
  776. MultiFDRecvParams *params;
  777. /* number of created threads */
  778. int count;
  779. /* syncs main thread and channels */
  780. QemuSemaphore sem_sync;
  781. /* global number of generated multifd packets */
  782. uint64_t packet_num;
  783. /* multifd ops */
  784. MultiFDMethods *ops;
  785. } *multifd_recv_state;
  786. static void multifd_recv_terminate_threads(Error *err)
  787. {
  788. int i;
  789. trace_multifd_recv_terminate_threads(err != NULL);
  790. if (err) {
  791. MigrationState *s = migrate_get_current();
  792. migrate_set_error(s, err);
  793. if (s->state == MIGRATION_STATUS_SETUP ||
  794. s->state == MIGRATION_STATUS_ACTIVE) {
  795. migrate_set_state(&s->state, s->state,
  796. MIGRATION_STATUS_FAILED);
  797. }
  798. }
  799. for (i = 0; i < migrate_multifd_channels(); i++) {
  800. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  801. qemu_mutex_lock(&p->mutex);
  802. p->quit = true;
  803. /*
  804. * We could arrive here for two reasons:
  805. * - normal quit, i.e. everything went fine, just finished
  806. * - error quit: We close the channels so the channel threads
  807. * finish the qio_channel_read_all_eof()
  808. */
  809. if (p->c) {
  810. qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
  811. }
  812. qemu_mutex_unlock(&p->mutex);
  813. }
  814. }
  815. int multifd_load_cleanup(Error **errp)
  816. {
  817. int i;
  818. if (!migrate_use_multifd()) {
  819. return 0;
  820. }
  821. multifd_recv_terminate_threads(NULL);
  822. for (i = 0; i < migrate_multifd_channels(); i++) {
  823. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  824. if (p->running) {
  825. p->quit = true;
  826. /*
  827. * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
  828. * however try to wakeup it without harm in cleanup phase.
  829. */
  830. qemu_sem_post(&p->sem_sync);
  831. qemu_thread_join(&p->thread);
  832. }
  833. }
  834. for (i = 0; i < migrate_multifd_channels(); i++) {
  835. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  836. object_unref(OBJECT(p->c));
  837. p->c = NULL;
  838. qemu_mutex_destroy(&p->mutex);
  839. qemu_sem_destroy(&p->sem_sync);
  840. g_free(p->name);
  841. p->name = NULL;
  842. multifd_pages_clear(p->pages);
  843. p->pages = NULL;
  844. p->packet_len = 0;
  845. g_free(p->packet);
  846. p->packet = NULL;
  847. multifd_recv_state->ops->recv_cleanup(p);
  848. }
  849. qemu_sem_destroy(&multifd_recv_state->sem_sync);
  850. g_free(multifd_recv_state->params);
  851. multifd_recv_state->params = NULL;
  852. g_free(multifd_recv_state);
  853. multifd_recv_state = NULL;
  854. return 0;
  855. }
  856. void multifd_recv_sync_main(void)
  857. {
  858. int i;
  859. if (!migrate_use_multifd()) {
  860. return;
  861. }
  862. for (i = 0; i < migrate_multifd_channels(); i++) {
  863. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  864. trace_multifd_recv_sync_main_wait(p->id);
  865. qemu_sem_wait(&multifd_recv_state->sem_sync);
  866. }
  867. for (i = 0; i < migrate_multifd_channels(); i++) {
  868. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  869. WITH_QEMU_LOCK_GUARD(&p->mutex) {
  870. if (multifd_recv_state->packet_num < p->packet_num) {
  871. multifd_recv_state->packet_num = p->packet_num;
  872. }
  873. }
  874. trace_multifd_recv_sync_main_signal(p->id);
  875. qemu_sem_post(&p->sem_sync);
  876. }
  877. trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
  878. }
  879. static void *multifd_recv_thread(void *opaque)
  880. {
  881. MultiFDRecvParams *p = opaque;
  882. Error *local_err = NULL;
  883. int ret;
  884. trace_multifd_recv_thread_start(p->id);
  885. rcu_register_thread();
  886. while (true) {
  887. uint32_t used;
  888. uint32_t flags;
  889. if (p->quit) {
  890. break;
  891. }
  892. ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
  893. p->packet_len, &local_err);
  894. if (ret == 0) { /* EOF */
  895. break;
  896. }
  897. if (ret == -1) { /* Error */
  898. break;
  899. }
  900. qemu_mutex_lock(&p->mutex);
  901. ret = multifd_recv_unfill_packet(p, &local_err);
  902. if (ret) {
  903. qemu_mutex_unlock(&p->mutex);
  904. break;
  905. }
  906. used = p->pages->used;
  907. flags = p->flags;
  908. /* recv methods don't know how to handle the SYNC flag */
  909. p->flags &= ~MULTIFD_FLAG_SYNC;
  910. trace_multifd_recv(p->id, p->packet_num, used, flags,
  911. p->next_packet_size);
  912. p->num_packets++;
  913. p->num_pages += used;
  914. qemu_mutex_unlock(&p->mutex);
  915. if (used) {
  916. ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
  917. if (ret != 0) {
  918. break;
  919. }
  920. }
  921. if (flags & MULTIFD_FLAG_SYNC) {
  922. qemu_sem_post(&multifd_recv_state->sem_sync);
  923. qemu_sem_wait(&p->sem_sync);
  924. }
  925. }
  926. if (local_err) {
  927. multifd_recv_terminate_threads(local_err);
  928. error_free(local_err);
  929. }
  930. qemu_mutex_lock(&p->mutex);
  931. p->running = false;
  932. qemu_mutex_unlock(&p->mutex);
  933. rcu_unregister_thread();
  934. trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
  935. return NULL;
  936. }
  937. int multifd_load_setup(Error **errp)
  938. {
  939. int thread_count;
  940. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  941. uint8_t i;
  942. if (!migrate_use_multifd()) {
  943. return 0;
  944. }
  945. thread_count = migrate_multifd_channels();
  946. multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
  947. multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
  948. qatomic_set(&multifd_recv_state->count, 0);
  949. qemu_sem_init(&multifd_recv_state->sem_sync, 0);
  950. multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
  951. for (i = 0; i < thread_count; i++) {
  952. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  953. qemu_mutex_init(&p->mutex);
  954. qemu_sem_init(&p->sem_sync, 0);
  955. p->quit = false;
  956. p->id = i;
  957. p->pages = multifd_pages_init(page_count);
  958. p->packet_len = sizeof(MultiFDPacket_t)
  959. + sizeof(uint64_t) * page_count;
  960. p->packet = g_malloc0(p->packet_len);
  961. p->name = g_strdup_printf("multifdrecv_%d", i);
  962. }
  963. for (i = 0; i < thread_count; i++) {
  964. MultiFDRecvParams *p = &multifd_recv_state->params[i];
  965. Error *local_err = NULL;
  966. int ret;
  967. ret = multifd_recv_state->ops->recv_setup(p, &local_err);
  968. if (ret) {
  969. error_propagate(errp, local_err);
  970. return ret;
  971. }
  972. }
  973. return 0;
  974. }
  975. bool multifd_recv_all_channels_created(void)
  976. {
  977. int thread_count = migrate_multifd_channels();
  978. if (!migrate_use_multifd()) {
  979. return true;
  980. }
  981. return thread_count == qatomic_read(&multifd_recv_state->count);
  982. }
  983. /*
  984. * Try to receive all multifd channels to get ready for the migration.
  985. * - Return true and do not set @errp when correctly receiving all channels;
  986. * - Return false and do not set @errp when correctly receiving the current one;
  987. * - Return false and set @errp when failing to receive the current channel.
  988. */
  989. bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
  990. {
  991. MultiFDRecvParams *p;
  992. Error *local_err = NULL;
  993. int id;
  994. id = multifd_recv_initial_packet(ioc, &local_err);
  995. if (id < 0) {
  996. multifd_recv_terminate_threads(local_err);
  997. error_propagate_prepend(errp, local_err,
  998. "failed to receive packet"
  999. " via multifd channel %d: ",
  1000. qatomic_read(&multifd_recv_state->count));
  1001. return false;
  1002. }
  1003. trace_multifd_recv_new_channel(id);
  1004. p = &multifd_recv_state->params[id];
  1005. if (p->c != NULL) {
  1006. error_setg(&local_err, "multifd: received id '%d' already setup'",
  1007. id);
  1008. multifd_recv_terminate_threads(local_err);
  1009. error_propagate(errp, local_err);
  1010. return false;
  1011. }
  1012. p->c = ioc;
  1013. object_ref(OBJECT(ioc));
  1014. /* initial packet */
  1015. p->num_packets = 1;
  1016. p->running = true;
  1017. qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
  1018. QEMU_THREAD_JOINABLE);
  1019. qatomic_inc(&multifd_recv_state->count);
  1020. return qatomic_read(&multifd_recv_state->count) ==
  1021. migrate_multifd_channels();
  1022. }