multifd-zlib.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. /*
  2. * Multifd zlib compression implementation
  3. *
  4. * Copyright (c) 2020 Red Hat Inc
  5. *
  6. * Authors:
  7. * Juan Quintela <quintela@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include <zlib.h>
  14. #include "qemu/rcu.h"
  15. #include "exec/target_page.h"
  16. #include "qapi/error.h"
  17. #include "migration.h"
  18. #include "trace.h"
  19. #include "multifd.h"
  20. struct zlib_data {
  21. /* stream for compression */
  22. z_stream zs;
  23. /* compressed buffer */
  24. uint8_t *zbuff;
  25. /* size of compressed buffer */
  26. uint32_t zbuff_len;
  27. };
  28. /* Multifd zlib compression */
  29. /**
  30. * zlib_send_setup: setup send side
  31. *
  32. * Setup each channel with zlib compression.
  33. *
  34. * Returns 0 for success or -1 for error
  35. *
  36. * @p: Params for the channel that we are using
  37. * @errp: pointer to an error
  38. */
  39. static int zlib_send_setup(MultiFDSendParams *p, Error **errp)
  40. {
  41. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  42. struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
  43. z_stream *zs = &z->zs;
  44. zs->zalloc = Z_NULL;
  45. zs->zfree = Z_NULL;
  46. zs->opaque = Z_NULL;
  47. if (deflateInit(zs, migrate_multifd_zlib_level()) != Z_OK) {
  48. g_free(z);
  49. error_setg(errp, "multifd %d: deflate init failed", p->id);
  50. return -1;
  51. }
  52. /* We will never have more than page_count pages */
  53. z->zbuff_len = page_count * qemu_target_page_size();
  54. z->zbuff_len *= 2;
  55. z->zbuff = g_try_malloc(z->zbuff_len);
  56. if (!z->zbuff) {
  57. deflateEnd(&z->zs);
  58. g_free(z);
  59. error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
  60. return -1;
  61. }
  62. p->data = z;
  63. return 0;
  64. }
  65. /**
  66. * zlib_send_cleanup: cleanup send side
  67. *
  68. * Close the channel and return memory.
  69. *
  70. * @p: Params for the channel that we are using
  71. */
  72. static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
  73. {
  74. struct zlib_data *z = p->data;
  75. deflateEnd(&z->zs);
  76. g_free(z->zbuff);
  77. z->zbuff = NULL;
  78. g_free(p->data);
  79. p->data = NULL;
  80. }
  81. /**
  82. * zlib_send_prepare: prepare date to be able to send
  83. *
  84. * Create a compressed buffer with all the pages that we are going to
  85. * send.
  86. *
  87. * Returns 0 for success or -1 for error
  88. *
  89. * @p: Params for the channel that we are using
  90. * @used: number of pages used
  91. */
  92. static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
  93. {
  94. struct iovec *iov = p->pages->iov;
  95. struct zlib_data *z = p->data;
  96. z_stream *zs = &z->zs;
  97. uint32_t out_size = 0;
  98. int ret;
  99. uint32_t i;
  100. for (i = 0; i < used; i++) {
  101. uint32_t available = z->zbuff_len - out_size;
  102. int flush = Z_NO_FLUSH;
  103. if (i == used - 1) {
  104. flush = Z_SYNC_FLUSH;
  105. }
  106. zs->avail_in = iov[i].iov_len;
  107. zs->next_in = iov[i].iov_base;
  108. zs->avail_out = available;
  109. zs->next_out = z->zbuff + out_size;
  110. /*
  111. * Welcome to deflate semantics
  112. *
  113. * We need to loop while:
  114. * - return is Z_OK
  115. * - there are stuff to be compressed
  116. * - there are output space free
  117. */
  118. do {
  119. ret = deflate(zs, flush);
  120. } while (ret == Z_OK && zs->avail_in && zs->avail_out);
  121. if (ret == Z_OK && zs->avail_in) {
  122. error_setg(errp, "multifd %d: deflate failed to compress all input",
  123. p->id);
  124. return -1;
  125. }
  126. if (ret != Z_OK) {
  127. error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK",
  128. p->id, ret);
  129. return -1;
  130. }
  131. out_size += available - zs->avail_out;
  132. }
  133. p->next_packet_size = out_size;
  134. p->flags |= MULTIFD_FLAG_ZLIB;
  135. return 0;
  136. }
  137. /**
  138. * zlib_send_write: do the actual write of the data
  139. *
  140. * Do the actual write of the comprresed buffer.
  141. *
  142. * Returns 0 for success or -1 for error
  143. *
  144. * @p: Params for the channel that we are using
  145. * @used: number of pages used
  146. * @errp: pointer to an error
  147. */
  148. static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
  149. {
  150. struct zlib_data *z = p->data;
  151. return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
  152. errp);
  153. }
  154. /**
  155. * zlib_recv_setup: setup receive side
  156. *
  157. * Create the compressed channel and buffer.
  158. *
  159. * Returns 0 for success or -1 for error
  160. *
  161. * @p: Params for the channel that we are using
  162. * @errp: pointer to an error
  163. */
  164. static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp)
  165. {
  166. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  167. struct zlib_data *z = g_malloc0(sizeof(struct zlib_data));
  168. z_stream *zs = &z->zs;
  169. p->data = z;
  170. zs->zalloc = Z_NULL;
  171. zs->zfree = Z_NULL;
  172. zs->opaque = Z_NULL;
  173. zs->avail_in = 0;
  174. zs->next_in = Z_NULL;
  175. if (inflateInit(zs) != Z_OK) {
  176. error_setg(errp, "multifd %d: inflate init failed", p->id);
  177. return -1;
  178. }
  179. /* We will never have more than page_count pages */
  180. z->zbuff_len = page_count * qemu_target_page_size();
  181. /* We know compression "could" use more space */
  182. z->zbuff_len *= 2;
  183. z->zbuff = g_try_malloc(z->zbuff_len);
  184. if (!z->zbuff) {
  185. inflateEnd(zs);
  186. error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
  187. return -1;
  188. }
  189. return 0;
  190. }
  191. /**
  192. * zlib_recv_cleanup: setup receive side
  193. *
  194. * For no compression this function does nothing.
  195. *
  196. * @p: Params for the channel that we are using
  197. */
  198. static void zlib_recv_cleanup(MultiFDRecvParams *p)
  199. {
  200. struct zlib_data *z = p->data;
  201. inflateEnd(&z->zs);
  202. g_free(z->zbuff);
  203. z->zbuff = NULL;
  204. g_free(p->data);
  205. p->data = NULL;
  206. }
  207. /**
  208. * zlib_recv_pages: read the data from the channel into actual pages
  209. *
  210. * Read the compressed buffer, and uncompress it into the actual
  211. * pages.
  212. *
  213. * Returns 0 for success or -1 for error
  214. *
  215. * @p: Params for the channel that we are using
  216. * @used: number of pages used
  217. * @errp: pointer to an error
  218. */
  219. static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
  220. {
  221. struct zlib_data *z = p->data;
  222. z_stream *zs = &z->zs;
  223. uint32_t in_size = p->next_packet_size;
  224. /* we measure the change of total_out */
  225. uint32_t out_size = zs->total_out;
  226. uint32_t expected_size = used * qemu_target_page_size();
  227. uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
  228. int ret;
  229. int i;
  230. if (flags != MULTIFD_FLAG_ZLIB) {
  231. error_setg(errp, "multifd %d: flags received %x flags expected %x",
  232. p->id, flags, MULTIFD_FLAG_ZLIB);
  233. return -1;
  234. }
  235. ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
  236. if (ret != 0) {
  237. return ret;
  238. }
  239. zs->avail_in = in_size;
  240. zs->next_in = z->zbuff;
  241. for (i = 0; i < used; i++) {
  242. struct iovec *iov = &p->pages->iov[i];
  243. int flush = Z_NO_FLUSH;
  244. unsigned long start = zs->total_out;
  245. if (i == used - 1) {
  246. flush = Z_SYNC_FLUSH;
  247. }
  248. zs->avail_out = iov->iov_len;
  249. zs->next_out = iov->iov_base;
  250. /*
  251. * Welcome to inflate semantics
  252. *
  253. * We need to loop while:
  254. * - return is Z_OK
  255. * - there are input available
  256. * - we haven't completed a full page
  257. */
  258. do {
  259. ret = inflate(zs, flush);
  260. } while (ret == Z_OK && zs->avail_in
  261. && (zs->total_out - start) < iov->iov_len);
  262. if (ret == Z_OK && (zs->total_out - start) < iov->iov_len) {
  263. error_setg(errp, "multifd %d: inflate generated too few output",
  264. p->id);
  265. return -1;
  266. }
  267. if (ret != Z_OK) {
  268. error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK",
  269. p->id, ret);
  270. return -1;
  271. }
  272. }
  273. out_size = zs->total_out - out_size;
  274. if (out_size != expected_size) {
  275. error_setg(errp, "multifd %d: packet size received %d size expected %d",
  276. p->id, out_size, expected_size);
  277. return -1;
  278. }
  279. return 0;
  280. }
  281. static MultiFDMethods multifd_zlib_ops = {
  282. .send_setup = zlib_send_setup,
  283. .send_cleanup = zlib_send_cleanup,
  284. .send_prepare = zlib_send_prepare,
  285. .send_write = zlib_send_write,
  286. .recv_setup = zlib_recv_setup,
  287. .recv_cleanup = zlib_recv_cleanup,
  288. .recv_pages = zlib_recv_pages
  289. };
  290. static void multifd_zlib_register(void)
  291. {
  292. multifd_register_ops(MULTIFD_COMPRESSION_ZLIB, &multifd_zlib_ops);
  293. }
  294. migration_init(multifd_zlib_register);