multifd-zstd.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. /*
  2. * Multifd zlib compression implementation
  3. *
  4. * Copyright (c) 2020 Red Hat Inc
  5. *
  6. * Authors:
  7. * Juan Quintela <quintela@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include <zstd.h>
  14. #include "qemu/rcu.h"
  15. #include "exec/target_page.h"
  16. #include "qapi/error.h"
  17. #include "migration.h"
  18. #include "trace.h"
  19. #include "multifd.h"
  20. struct zstd_data {
  21. /* stream for compression */
  22. ZSTD_CStream *zcs;
  23. /* stream for decompression */
  24. ZSTD_DStream *zds;
  25. /* buffers */
  26. ZSTD_inBuffer in;
  27. ZSTD_outBuffer out;
  28. /* compressed buffer */
  29. uint8_t *zbuff;
  30. /* size of compressed buffer */
  31. uint32_t zbuff_len;
  32. };
  33. /* Multifd zstd compression */
  34. /**
  35. * zstd_send_setup: setup send side
  36. *
  37. * Setup each channel with zstd compression.
  38. *
  39. * Returns 0 for success or -1 for error
  40. *
  41. * @p: Params for the channel that we are using
  42. * @errp: pointer to an error
  43. */
  44. static int zstd_send_setup(MultiFDSendParams *p, Error **errp)
  45. {
  46. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  47. struct zstd_data *z = g_new0(struct zstd_data, 1);
  48. int res;
  49. p->data = z;
  50. z->zcs = ZSTD_createCStream();
  51. if (!z->zcs) {
  52. g_free(z);
  53. error_setg(errp, "multifd %d: zstd createCStream failed", p->id);
  54. return -1;
  55. }
  56. res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
  57. if (ZSTD_isError(res)) {
  58. ZSTD_freeCStream(z->zcs);
  59. g_free(z);
  60. error_setg(errp, "multifd %d: initCStream failed with error %s",
  61. p->id, ZSTD_getErrorName(res));
  62. return -1;
  63. }
  64. /* We will never have more than page_count pages */
  65. z->zbuff_len = page_count * qemu_target_page_size();
  66. z->zbuff_len *= 2;
  67. z->zbuff = g_try_malloc(z->zbuff_len);
  68. if (!z->zbuff) {
  69. ZSTD_freeCStream(z->zcs);
  70. g_free(z);
  71. error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
  72. return -1;
  73. }
  74. return 0;
  75. }
  76. /**
  77. * zstd_send_cleanup: cleanup send side
  78. *
  79. * Close the channel and return memory.
  80. *
  81. * @p: Params for the channel that we are using
  82. */
  83. static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
  84. {
  85. struct zstd_data *z = p->data;
  86. ZSTD_freeCStream(z->zcs);
  87. z->zcs = NULL;
  88. g_free(z->zbuff);
  89. z->zbuff = NULL;
  90. g_free(p->data);
  91. p->data = NULL;
  92. }
  93. /**
  94. * zstd_send_prepare: prepare date to be able to send
  95. *
  96. * Create a compressed buffer with all the pages that we are going to
  97. * send.
  98. *
  99. * Returns 0 for success or -1 for error
  100. *
  101. * @p: Params for the channel that we are using
  102. * @used: number of pages used
  103. */
  104. static int zstd_send_prepare(MultiFDSendParams *p, uint32_t used, Error **errp)
  105. {
  106. struct iovec *iov = p->pages->iov;
  107. struct zstd_data *z = p->data;
  108. int ret;
  109. uint32_t i;
  110. z->out.dst = z->zbuff;
  111. z->out.size = z->zbuff_len;
  112. z->out.pos = 0;
  113. for (i = 0; i < used; i++) {
  114. ZSTD_EndDirective flush = ZSTD_e_continue;
  115. if (i == used - 1) {
  116. flush = ZSTD_e_flush;
  117. }
  118. z->in.src = iov[i].iov_base;
  119. z->in.size = iov[i].iov_len;
  120. z->in.pos = 0;
  121. /*
  122. * Welcome to compressStream2 semantics
  123. *
  124. * We need to loop while:
  125. * - return is > 0
  126. * - there is input available
  127. * - there is output space free
  128. */
  129. do {
  130. ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
  131. } while (ret > 0 && (z->in.size - z->in.pos > 0)
  132. && (z->out.size - z->out.pos > 0));
  133. if (ret > 0 && (z->in.size - z->in.pos > 0)) {
  134. error_setg(errp, "multifd %d: compressStream buffer too small",
  135. p->id);
  136. return -1;
  137. }
  138. if (ZSTD_isError(ret)) {
  139. error_setg(errp, "multifd %d: compressStream error %s",
  140. p->id, ZSTD_getErrorName(ret));
  141. return -1;
  142. }
  143. }
  144. p->next_packet_size = z->out.pos;
  145. p->flags |= MULTIFD_FLAG_ZSTD;
  146. return 0;
  147. }
  148. /**
  149. * zstd_send_write: do the actual write of the data
  150. *
  151. * Do the actual write of the comprresed buffer.
  152. *
  153. * Returns 0 for success or -1 for error
  154. *
  155. * @p: Params for the channel that we are using
  156. * @used: number of pages used
  157. * @errp: pointer to an error
  158. */
  159. static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
  160. {
  161. struct zstd_data *z = p->data;
  162. return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size,
  163. errp);
  164. }
  165. /**
  166. * zstd_recv_setup: setup receive side
  167. *
  168. * Create the compressed channel and buffer.
  169. *
  170. * Returns 0 for success or -1 for error
  171. *
  172. * @p: Params for the channel that we are using
  173. * @errp: pointer to an error
  174. */
  175. static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
  176. {
  177. uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
  178. struct zstd_data *z = g_new0(struct zstd_data, 1);
  179. int ret;
  180. p->data = z;
  181. z->zds = ZSTD_createDStream();
  182. if (!z->zds) {
  183. g_free(z);
  184. error_setg(errp, "multifd %d: zstd createDStream failed", p->id);
  185. return -1;
  186. }
  187. ret = ZSTD_initDStream(z->zds);
  188. if (ZSTD_isError(ret)) {
  189. ZSTD_freeDStream(z->zds);
  190. g_free(z);
  191. error_setg(errp, "multifd %d: initDStream failed with error %s",
  192. p->id, ZSTD_getErrorName(ret));
  193. return -1;
  194. }
  195. /* We will never have more than page_count pages */
  196. z->zbuff_len = page_count * qemu_target_page_size();
  197. /* We know compression "could" use more space */
  198. z->zbuff_len *= 2;
  199. z->zbuff = g_try_malloc(z->zbuff_len);
  200. if (!z->zbuff) {
  201. ZSTD_freeDStream(z->zds);
  202. g_free(z);
  203. error_setg(errp, "multifd %d: out of memory for zbuff", p->id);
  204. return -1;
  205. }
  206. return 0;
  207. }
  208. /**
  209. * zstd_recv_cleanup: setup receive side
  210. *
  211. * For no compression this function does nothing.
  212. *
  213. * @p: Params for the channel that we are using
  214. */
  215. static void zstd_recv_cleanup(MultiFDRecvParams *p)
  216. {
  217. struct zstd_data *z = p->data;
  218. ZSTD_freeDStream(z->zds);
  219. z->zds = NULL;
  220. g_free(z->zbuff);
  221. z->zbuff = NULL;
  222. g_free(p->data);
  223. p->data = NULL;
  224. }
  225. /**
  226. * zstd_recv_pages: read the data from the channel into actual pages
  227. *
  228. * Read the compressed buffer, and uncompress it into the actual
  229. * pages.
  230. *
  231. * Returns 0 for success or -1 for error
  232. *
  233. * @p: Params for the channel that we are using
  234. * @used: number of pages used
  235. * @errp: pointer to an error
  236. */
  237. static int zstd_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
  238. {
  239. uint32_t in_size = p->next_packet_size;
  240. uint32_t out_size = 0;
  241. uint32_t expected_size = used * qemu_target_page_size();
  242. uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
  243. struct zstd_data *z = p->data;
  244. int ret;
  245. int i;
  246. if (flags != MULTIFD_FLAG_ZSTD) {
  247. error_setg(errp, "multifd %d: flags received %x flags expected %x",
  248. p->id, flags, MULTIFD_FLAG_ZSTD);
  249. return -1;
  250. }
  251. ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
  252. if (ret != 0) {
  253. return ret;
  254. }
  255. z->in.src = z->zbuff;
  256. z->in.size = in_size;
  257. z->in.pos = 0;
  258. for (i = 0; i < used; i++) {
  259. struct iovec *iov = &p->pages->iov[i];
  260. z->out.dst = iov->iov_base;
  261. z->out.size = iov->iov_len;
  262. z->out.pos = 0;
  263. /*
  264. * Welcome to decompressStream semantics
  265. *
  266. * We need to loop while:
  267. * - return is > 0
  268. * - there is input available
  269. * - we haven't put out a full page
  270. */
  271. do {
  272. ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
  273. } while (ret > 0 && (z->in.size - z->in.pos > 0)
  274. && (z->out.pos < iov->iov_len));
  275. if (ret > 0 && (z->out.pos < iov->iov_len)) {
  276. error_setg(errp, "multifd %d: decompressStream buffer too small",
  277. p->id);
  278. return -1;
  279. }
  280. if (ZSTD_isError(ret)) {
  281. error_setg(errp, "multifd %d: decompressStream returned %s",
  282. p->id, ZSTD_getErrorName(ret));
  283. return ret;
  284. }
  285. out_size += z->out.pos;
  286. }
  287. if (out_size != expected_size) {
  288. error_setg(errp, "multifd %d: packet size received %d size expected %d",
  289. p->id, out_size, expected_size);
  290. return -1;
  291. }
  292. return 0;
  293. }
  294. static MultiFDMethods multifd_zstd_ops = {
  295. .send_setup = zstd_send_setup,
  296. .send_cleanup = zstd_send_cleanup,
  297. .send_prepare = zstd_send_prepare,
  298. .send_write = zstd_send_write,
  299. .recv_setup = zstd_recv_setup,
  300. .recv_cleanup = zstd_recv_cleanup,
  301. .recv_pages = zstd_recv_pages
  302. };
  303. static void multifd_zstd_register(void)
  304. {
  305. multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
  306. }
  307. migration_init(multifd_zstd_register);