|
@@ -269,6 +269,10 @@ struct CompressParam {
|
|
|
QemuCond cond;
|
|
|
RAMBlock *block;
|
|
|
ram_addr_t offset;
|
|
|
+
|
|
|
+ /* internally used fields */
|
|
|
+ z_stream stream;
|
|
|
+ uint8_t *originbuf;
|
|
|
};
|
|
|
typedef struct CompressParam CompressParam;
|
|
|
|
|
@@ -280,6 +284,7 @@ struct DecompressParam {
|
|
|
void *des;
|
|
|
uint8_t *compbuf;
|
|
|
int len;
|
|
|
+ z_stream stream;
|
|
|
};
|
|
|
typedef struct DecompressParam DecompressParam;
|
|
|
|
|
@@ -294,13 +299,14 @@ static QemuCond comp_done_cond;
|
|
|
/* The empty QEMUFileOps will be used by file in CompressParam */
|
|
|
static const QEMUFileOps empty_ops = { };
|
|
|
|
|
|
+static QEMUFile *decomp_file;
|
|
|
static DecompressParam *decomp_param;
|
|
|
static QemuThread *decompress_threads;
|
|
|
static QemuMutex decomp_done_lock;
|
|
|
static QemuCond decomp_done_cond;
|
|
|
|
|
|
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
|
|
|
- ram_addr_t offset);
|
|
|
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
|
|
|
+ ram_addr_t offset, uint8_t *source_buf);
|
|
|
|
|
|
static void *do_data_compress(void *opaque)
|
|
|
{
|
|
@@ -316,7 +322,8 @@ static void *do_data_compress(void *opaque)
|
|
|
param->block = NULL;
|
|
|
qemu_mutex_unlock(¶m->mutex);
|
|
|
|
|
|
- do_compress_ram_page(param->file, block, offset);
|
|
|
+ do_compress_ram_page(param->file, ¶m->stream, block, offset,
|
|
|
+ param->originbuf);
|
|
|
|
|
|
qemu_mutex_lock(&comp_done_lock);
|
|
|
param->done = true;
|
|
@@ -357,10 +364,20 @@ static void compress_threads_save_cleanup(void)
|
|
|
terminate_compression_threads();
|
|
|
thread_count = migrate_compress_threads();
|
|
|
for (i = 0; i < thread_count; i++) {
|
|
|
+ /*
|
|
|
+ * we use it as a indicator which shows if the thread is
|
|
|
+ * properly init'd or not
|
|
|
+ */
|
|
|
+ if (!comp_param[i].file) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
qemu_thread_join(compress_threads + i);
|
|
|
- qemu_fclose(comp_param[i].file);
|
|
|
qemu_mutex_destroy(&comp_param[i].mutex);
|
|
|
qemu_cond_destroy(&comp_param[i].cond);
|
|
|
+ deflateEnd(&comp_param[i].stream);
|
|
|
+ g_free(comp_param[i].originbuf);
|
|
|
+ qemu_fclose(comp_param[i].file);
|
|
|
+ comp_param[i].file = NULL;
|
|
|
}
|
|
|
qemu_mutex_destroy(&comp_done_lock);
|
|
|
qemu_cond_destroy(&comp_done_cond);
|
|
@@ -370,12 +387,12 @@ static void compress_threads_save_cleanup(void)
|
|
|
comp_param = NULL;
|
|
|
}
|
|
|
|
|
|
-static void compress_threads_save_setup(void)
|
|
|
+static int compress_threads_save_setup(void)
|
|
|
{
|
|
|
int i, thread_count;
|
|
|
|
|
|
if (!migrate_use_compression()) {
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
}
|
|
|
thread_count = migrate_compress_threads();
|
|
|
compress_threads = g_new0(QemuThread, thread_count);
|
|
@@ -383,6 +400,17 @@ static void compress_threads_save_setup(void)
|
|
|
qemu_cond_init(&comp_done_cond);
|
|
|
qemu_mutex_init(&comp_done_lock);
|
|
|
for (i = 0; i < thread_count; i++) {
|
|
|
+ comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
|
|
|
+ if (!comp_param[i].originbuf) {
|
|
|
+ goto exit;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (deflateInit(&comp_param[i].stream,
|
|
|
+ migrate_compress_level()) != Z_OK) {
|
|
|
+ g_free(comp_param[i].originbuf);
|
|
|
+ goto exit;
|
|
|
+ }
|
|
|
+
|
|
|
/* comp_param[i].file is just used as a dummy buffer to save data,
|
|
|
* set its ops to empty.
|
|
|
*/
|
|
@@ -395,6 +423,11 @@ static void compress_threads_save_setup(void)
|
|
|
do_data_compress, comp_param + i,
|
|
|
QEMU_THREAD_JOINABLE);
|
|
|
}
|
|
|
+ return 0;
|
|
|
+
|
|
|
+exit:
|
|
|
+ compress_threads_save_cleanup();
|
|
|
+ return -1;
|
|
|
}
|
|
|
|
|
|
/* Multiple fd's */
|
|
@@ -941,6 +974,72 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
|
|
|
ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * @pages: the number of pages written by the control path,
|
|
|
+ * < 0 - error
|
|
|
+ * > 0 - number of pages written
|
|
|
+ *
|
|
|
+ * Return true if the pages has been saved, otherwise false is returned.
|
|
|
+ */
|
|
|
+static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
|
|
|
+ int *pages)
|
|
|
+{
|
|
|
+ uint64_t bytes_xmit = 0;
|
|
|
+ int ret;
|
|
|
+
|
|
|
+ *pages = -1;
|
|
|
+ ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
|
|
|
+ &bytes_xmit);
|
|
|
+ if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (bytes_xmit) {
|
|
|
+ ram_counters.transferred += bytes_xmit;
|
|
|
+ *pages = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ret == RAM_SAVE_CONTROL_DELAYED) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (bytes_xmit > 0) {
|
|
|
+ ram_counters.normal++;
|
|
|
+ } else if (bytes_xmit == 0) {
|
|
|
+ ram_counters.duplicate++;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * directly send the page to the stream
|
|
|
+ *
|
|
|
+ * Returns the number of pages written.
|
|
|
+ *
|
|
|
+ * @rs: current RAM state
|
|
|
+ * @block: block that contains the page we want to send
|
|
|
+ * @offset: offset inside the block for the page
|
|
|
+ * @buf: the page to be sent
|
|
|
+ * @async: send to page asyncly
|
|
|
+ */
|
|
|
+static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
|
|
|
+ uint8_t *buf, bool async)
|
|
|
+{
|
|
|
+ ram_counters.transferred += save_page_header(rs, rs->f, block,
|
|
|
+ offset | RAM_SAVE_FLAG_PAGE);
|
|
|
+ if (async) {
|
|
|
+ qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
|
|
|
+ migrate_release_ram() &
|
|
|
+ migration_in_postcopy());
|
|
|
+ } else {
|
|
|
+ qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
|
|
|
+ }
|
|
|
+ ram_counters.transferred += TARGET_PAGE_SIZE;
|
|
|
+ ram_counters.normal++;
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* ram_save_page: send the given page to the stream
|
|
|
*
|
|
@@ -957,73 +1056,31 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
|
|
|
static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
|
|
|
{
|
|
|
int pages = -1;
|
|
|
- uint64_t bytes_xmit;
|
|
|
- ram_addr_t current_addr;
|
|
|
uint8_t *p;
|
|
|
- int ret;
|
|
|
bool send_async = true;
|
|
|
RAMBlock *block = pss->block;
|
|
|
ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
|
|
|
+ ram_addr_t current_addr = block->offset + offset;
|
|
|
|
|
|
p = block->host + offset;
|
|
|
trace_ram_save_page(block->idstr, (uint64_t)offset, p);
|
|
|
|
|
|
- /* In doubt sent page as normal */
|
|
|
- bytes_xmit = 0;
|
|
|
- ret = ram_control_save_page(rs->f, block->offset,
|
|
|
- offset, TARGET_PAGE_SIZE, &bytes_xmit);
|
|
|
- if (bytes_xmit) {
|
|
|
- ram_counters.transferred += bytes_xmit;
|
|
|
- pages = 1;
|
|
|
- }
|
|
|
-
|
|
|
XBZRLE_cache_lock();
|
|
|
-
|
|
|
- current_addr = block->offset + offset;
|
|
|
-
|
|
|
- if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
|
|
|
- if (ret != RAM_SAVE_CONTROL_DELAYED) {
|
|
|
- if (bytes_xmit > 0) {
|
|
|
- ram_counters.normal++;
|
|
|
- } else if (bytes_xmit == 0) {
|
|
|
- ram_counters.duplicate++;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- pages = save_zero_page(rs, block, offset);
|
|
|
- if (pages > 0) {
|
|
|
- /* Must let xbzrle know, otherwise a previous (now 0'd) cached
|
|
|
- * page would be stale
|
|
|
+ if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
|
|
|
+ migrate_use_xbzrle()) {
|
|
|
+ pages = save_xbzrle_page(rs, &p, current_addr, block,
|
|
|
+ offset, last_stage);
|
|
|
+ if (!last_stage) {
|
|
|
+ /* Can't send this cached data async, since the cache page
|
|
|
+ * might get updated before it gets to the wire
|
|
|
*/
|
|
|
- xbzrle_cache_zero_page(rs, current_addr);
|
|
|
- ram_release_pages(block->idstr, offset, pages);
|
|
|
- } else if (!rs->ram_bulk_stage &&
|
|
|
- !migration_in_postcopy() && migrate_use_xbzrle()) {
|
|
|
- pages = save_xbzrle_page(rs, &p, current_addr, block,
|
|
|
- offset, last_stage);
|
|
|
- if (!last_stage) {
|
|
|
- /* Can't send this cached data async, since the cache page
|
|
|
- * might get updated before it gets to the wire
|
|
|
- */
|
|
|
- send_async = false;
|
|
|
- }
|
|
|
+ send_async = false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/* XBZRLE overflow or normal page */
|
|
|
if (pages == -1) {
|
|
|
- ram_counters.transferred +=
|
|
|
- save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_PAGE);
|
|
|
- if (send_async) {
|
|
|
- qemu_put_buffer_async(rs->f, p, TARGET_PAGE_SIZE,
|
|
|
- migrate_release_ram() &
|
|
|
- migration_in_postcopy());
|
|
|
- } else {
|
|
|
- qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
|
|
|
- }
|
|
|
- ram_counters.transferred += TARGET_PAGE_SIZE;
|
|
|
- pages = 1;
|
|
|
- ram_counters.normal++;
|
|
|
+ pages = save_normal_page(rs, block, offset, p, send_async);
|
|
|
}
|
|
|
|
|
|
XBZRLE_cache_unlock();
|
|
@@ -1031,8 +1088,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
|
|
|
return pages;
|
|
|
}
|
|
|
|
|
|
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
|
|
|
- ram_addr_t offset)
|
|
|
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
|
|
|
+ ram_addr_t offset, uint8_t *source_buf)
|
|
|
{
|
|
|
RAMState *rs = ram_state;
|
|
|
int bytes_sent, blen;
|
|
@@ -1040,8 +1097,14 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
|
|
|
|
|
|
bytes_sent = save_page_header(rs, f, block, offset |
|
|
|
RAM_SAVE_FLAG_COMPRESS_PAGE);
|
|
|
- blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
|
|
|
- migrate_compress_level());
|
|
|
+
|
|
|
+ /*
|
|
|
+ * copy it to a internal buffer to avoid it being modified by VM
|
|
|
+ * so that we can catch up the error during compression and
|
|
|
+ * decompression
|
|
|
+ */
|
|
|
+ memcpy(source_buf, p, TARGET_PAGE_SIZE);
|
|
|
+ blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
|
|
|
if (blen < 0) {
|
|
|
bytes_sent = 0;
|
|
|
qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
|
|
@@ -1121,83 +1184,6 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
|
|
|
return pages;
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * ram_save_compressed_page: compress the given page and send it to the stream
|
|
|
- *
|
|
|
- * Returns the number of pages written.
|
|
|
- *
|
|
|
- * @rs: current RAM state
|
|
|
- * @block: block that contains the page we want to send
|
|
|
- * @offset: offset inside the block for the page
|
|
|
- * @last_stage: if we are at the completion stage
|
|
|
- */
|
|
|
-static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
|
|
|
- bool last_stage)
|
|
|
-{
|
|
|
- int pages = -1;
|
|
|
- uint64_t bytes_xmit = 0;
|
|
|
- uint8_t *p;
|
|
|
- int ret, blen;
|
|
|
- RAMBlock *block = pss->block;
|
|
|
- ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
|
|
|
-
|
|
|
- p = block->host + offset;
|
|
|
-
|
|
|
- ret = ram_control_save_page(rs->f, block->offset,
|
|
|
- offset, TARGET_PAGE_SIZE, &bytes_xmit);
|
|
|
- if (bytes_xmit) {
|
|
|
- ram_counters.transferred += bytes_xmit;
|
|
|
- pages = 1;
|
|
|
- }
|
|
|
- if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
|
|
|
- if (ret != RAM_SAVE_CONTROL_DELAYED) {
|
|
|
- if (bytes_xmit > 0) {
|
|
|
- ram_counters.normal++;
|
|
|
- } else if (bytes_xmit == 0) {
|
|
|
- ram_counters.duplicate++;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- /* When starting the process of a new block, the first page of
|
|
|
- * the block should be sent out before other pages in the same
|
|
|
- * block, and all the pages in last block should have been sent
|
|
|
- * out, keeping this order is important, because the 'cont' flag
|
|
|
- * is used to avoid resending the block name.
|
|
|
- */
|
|
|
- if (block != rs->last_sent_block) {
|
|
|
- flush_compressed_data(rs);
|
|
|
- pages = save_zero_page(rs, block, offset);
|
|
|
- if (pages == -1) {
|
|
|
- /* Make sure the first page is sent out before other pages */
|
|
|
- bytes_xmit = save_page_header(rs, rs->f, block, offset |
|
|
|
- RAM_SAVE_FLAG_COMPRESS_PAGE);
|
|
|
- blen = qemu_put_compression_data(rs->f, p, TARGET_PAGE_SIZE,
|
|
|
- migrate_compress_level());
|
|
|
- if (blen > 0) {
|
|
|
- ram_counters.transferred += bytes_xmit + blen;
|
|
|
- ram_counters.normal++;
|
|
|
- pages = 1;
|
|
|
- } else {
|
|
|
- qemu_file_set_error(rs->f, blen);
|
|
|
- error_report("compressed data failed!");
|
|
|
- }
|
|
|
- }
|
|
|
- if (pages > 0) {
|
|
|
- ram_release_pages(block->idstr, offset, pages);
|
|
|
- }
|
|
|
- } else {
|
|
|
- pages = save_zero_page(rs, block, offset);
|
|
|
- if (pages == -1) {
|
|
|
- pages = compress_page_with_multi_thread(rs, block, offset);
|
|
|
- } else {
|
|
|
- ram_release_pages(block->idstr, offset, pages);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return pages;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* find_dirty_block: find the next dirty page and update any state
|
|
|
* associated with the search process.
|
|
@@ -1434,44 +1420,80 @@ err:
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
+static bool save_page_use_compression(RAMState *rs)
|
|
|
+{
|
|
|
+ if (!migrate_use_compression()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * If xbzrle is on, stop using the data compression after first
|
|
|
+ * round of migration even if compression is enabled. In theory,
|
|
|
+ * xbzrle can do better than compression.
|
|
|
+ */
|
|
|
+ if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* ram_save_target_page: save one target page
|
|
|
*
|
|
|
* Returns the number of pages written
|
|
|
*
|
|
|
* @rs: current RAM state
|
|
|
- * @ms: current migration state
|
|
|
* @pss: data about the page we want to send
|
|
|
* @last_stage: if we are at the completion stage
|
|
|
*/
|
|
|
static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
|
|
|
bool last_stage)
|
|
|
{
|
|
|
- int res = 0;
|
|
|
+ RAMBlock *block = pss->block;
|
|
|
+ ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
|
|
|
+ int res;
|
|
|
|
|
|
- /* Check the pages is dirty and if it is send it */
|
|
|
- if (migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
|
|
|
- /*
|
|
|
- * If xbzrle is on, stop using the data compression after first
|
|
|
- * round of migration even if compression is enabled. In theory,
|
|
|
- * xbzrle can do better than compression.
|
|
|
+ if (control_save_page(rs, block, offset, &res)) {
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * When starting the process of a new block, the first page of
|
|
|
+ * the block should be sent out before other pages in the same
|
|
|
+ * block, and all the pages in last block should have been sent
|
|
|
+ * out, keeping this order is important, because the 'cont' flag
|
|
|
+ * is used to avoid resending the block name.
|
|
|
+ */
|
|
|
+ if (block != rs->last_sent_block && save_page_use_compression(rs)) {
|
|
|
+ flush_compressed_data(rs);
|
|
|
+ }
|
|
|
+
|
|
|
+ res = save_zero_page(rs, block, offset);
|
|
|
+ if (res > 0) {
|
|
|
+ /* Must let xbzrle know, otherwise a previous (now 0'd) cached
|
|
|
+ * page would be stale
|
|
|
*/
|
|
|
- if (migrate_use_compression() &&
|
|
|
- (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
|
|
|
- res = ram_save_compressed_page(rs, pss, last_stage);
|
|
|
- } else {
|
|
|
- res = ram_save_page(rs, pss, last_stage);
|
|
|
+ if (!save_page_use_compression(rs)) {
|
|
|
+ XBZRLE_cache_lock();
|
|
|
+ xbzrle_cache_zero_page(rs, block->offset + offset);
|
|
|
+ XBZRLE_cache_unlock();
|
|
|
}
|
|
|
+ ram_release_pages(block->idstr, offset, res);
|
|
|
+ return res;
|
|
|
+ }
|
|
|
|
|
|
- if (res < 0) {
|
|
|
- return res;
|
|
|
- }
|
|
|
- if (pss->block->unsentmap) {
|
|
|
- clear_bit(pss->page, pss->block->unsentmap);
|
|
|
- }
|
|
|
+ /*
|
|
|
+ * Make sure the first page is sent out before other pages.
|
|
|
+ *
|
|
|
+ * we post it as normal page as compression will take much
|
|
|
+ * CPU resource.
|
|
|
+ */
|
|
|
+ if (block == rs->last_sent_block && save_page_use_compression(rs)) {
|
|
|
+ res = compress_page_with_multi_thread(rs, block, offset);
|
|
|
}
|
|
|
|
|
|
- return res;
|
|
|
+ return ram_save_page(rs, pss, last_stage);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1500,12 +1522,22 @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
|
|
|
qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
|
|
|
|
|
|
do {
|
|
|
+ /* Check the pages is dirty and if it is send it */
|
|
|
+ if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
|
|
|
+ pss->page++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
tmppages = ram_save_target_page(rs, pss, last_stage);
|
|
|
if (tmppages < 0) {
|
|
|
return tmppages;
|
|
|
}
|
|
|
|
|
|
pages += tmppages;
|
|
|
+ if (pss->block->unsentmap) {
|
|
|
+ clear_bit(pss->page, pss->block->unsentmap);
|
|
|
+ }
|
|
|
+
|
|
|
pss->page++;
|
|
|
} while ((pss->page & (pagesize_bits - 1)) &&
|
|
|
offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
|
|
@@ -2214,9 +2246,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
|
|
|
RAMState **rsp = opaque;
|
|
|
RAMBlock *block;
|
|
|
|
|
|
+ if (compress_threads_save_setup()) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
/* migration has already setup the bitmap, reuse it. */
|
|
|
if (!migration_in_colo_state()) {
|
|
|
if (ram_init_all(rsp) != 0) {
|
|
|
+ compress_threads_save_cleanup();
|
|
|
return -1;
|
|
|
}
|
|
|
}
|
|
@@ -2236,7 +2273,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
|
|
|
}
|
|
|
|
|
|
rcu_read_unlock();
|
|
|
- compress_threads_save_setup();
|
|
|
|
|
|
ram_control_before_iterate(f, RAM_CONTROL_SETUP);
|
|
|
ram_control_after_iterate(f, RAM_CONTROL_SETUP);
|
|
@@ -2501,12 +2537,37 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/* return the size after decompression, or negative value on error */
|
|
|
+static int
|
|
|
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
|
|
|
+ const uint8_t *source, size_t source_len)
|
|
|
+{
|
|
|
+ int err;
|
|
|
+
|
|
|
+ err = inflateReset(stream);
|
|
|
+ if (err != Z_OK) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ stream->avail_in = source_len;
|
|
|
+ stream->next_in = (uint8_t *)source;
|
|
|
+ stream->avail_out = dest_len;
|
|
|
+ stream->next_out = dest;
|
|
|
+
|
|
|
+ err = inflate(stream, Z_NO_FLUSH);
|
|
|
+ if (err != Z_STREAM_END) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ return stream->total_out;
|
|
|
+}
|
|
|
+
|
|
|
static void *do_data_decompress(void *opaque)
|
|
|
{
|
|
|
DecompressParam *param = opaque;
|
|
|
unsigned long pagesize;
|
|
|
uint8_t *des;
|
|
|
- int len;
|
|
|
+ int len, ret;
|
|
|
|
|
|
qemu_mutex_lock(¶m->mutex);
|
|
|
while (!param->quit) {
|
|
@@ -2517,13 +2578,13 @@ static void *do_data_decompress(void *opaque)
|
|
|
qemu_mutex_unlock(¶m->mutex);
|
|
|
|
|
|
pagesize = TARGET_PAGE_SIZE;
|
|
|
- /* uncompress() will return failed in some case, especially
|
|
|
- * when the page is dirted when doing the compression, it's
|
|
|
- * not a problem because the dirty page will be retransferred
|
|
|
- * and uncompress() won't break the data in other pages.
|
|
|
- */
|
|
|
- uncompress((Bytef *)des, &pagesize,
|
|
|
- (const Bytef *)param->compbuf, len);
|
|
|
+
|
|
|
+ ret = qemu_uncompress_data(¶m->stream, des, pagesize,
|
|
|
+ param->compbuf, len);
|
|
|
+ if (ret < 0) {
|
|
|
+ error_report("decompress data failed");
|
|
|
+ qemu_file_set_error(decomp_file, ret);
|
|
|
+ }
|
|
|
|
|
|
qemu_mutex_lock(&decomp_done_lock);
|
|
|
param->done = true;
|
|
@@ -2540,12 +2601,12 @@ static void *do_data_decompress(void *opaque)
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
-static void wait_for_decompress_done(void)
|
|
|
+static int wait_for_decompress_done(void)
|
|
|
{
|
|
|
int idx, thread_count;
|
|
|
|
|
|
if (!migrate_use_compression()) {
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
thread_count = migrate_decompress_threads();
|
|
@@ -2556,30 +2617,7 @@ static void wait_for_decompress_done(void)
|
|
|
}
|
|
|
}
|
|
|
qemu_mutex_unlock(&decomp_done_lock);
|
|
|
-}
|
|
|
-
|
|
|
-static void compress_threads_load_setup(void)
|
|
|
-{
|
|
|
- int i, thread_count;
|
|
|
-
|
|
|
- if (!migrate_use_compression()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- thread_count = migrate_decompress_threads();
|
|
|
- decompress_threads = g_new0(QemuThread, thread_count);
|
|
|
- decomp_param = g_new0(DecompressParam, thread_count);
|
|
|
- qemu_mutex_init(&decomp_done_lock);
|
|
|
- qemu_cond_init(&decomp_done_cond);
|
|
|
- for (i = 0; i < thread_count; i++) {
|
|
|
- qemu_mutex_init(&decomp_param[i].mutex);
|
|
|
- qemu_cond_init(&decomp_param[i].cond);
|
|
|
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
|
|
|
- decomp_param[i].done = true;
|
|
|
- decomp_param[i].quit = false;
|
|
|
- qemu_thread_create(decompress_threads + i, "decompress",
|
|
|
- do_data_decompress, decomp_param + i,
|
|
|
- QEMU_THREAD_JOINABLE);
|
|
|
- }
|
|
|
+ return qemu_file_get_error(decomp_file);
|
|
|
}
|
|
|
|
|
|
static void compress_threads_load_cleanup(void)
|
|
@@ -2591,21 +2629,70 @@ static void compress_threads_load_cleanup(void)
|
|
|
}
|
|
|
thread_count = migrate_decompress_threads();
|
|
|
for (i = 0; i < thread_count; i++) {
|
|
|
+ /*
|
|
|
+ * we use it as a indicator which shows if the thread is
|
|
|
+ * properly init'd or not
|
|
|
+ */
|
|
|
+ if (!decomp_param[i].compbuf) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
qemu_mutex_lock(&decomp_param[i].mutex);
|
|
|
decomp_param[i].quit = true;
|
|
|
qemu_cond_signal(&decomp_param[i].cond);
|
|
|
qemu_mutex_unlock(&decomp_param[i].mutex);
|
|
|
}
|
|
|
for (i = 0; i < thread_count; i++) {
|
|
|
+ if (!decomp_param[i].compbuf) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
qemu_thread_join(decompress_threads + i);
|
|
|
qemu_mutex_destroy(&decomp_param[i].mutex);
|
|
|
qemu_cond_destroy(&decomp_param[i].cond);
|
|
|
+ inflateEnd(&decomp_param[i].stream);
|
|
|
g_free(decomp_param[i].compbuf);
|
|
|
+ decomp_param[i].compbuf = NULL;
|
|
|
}
|
|
|
g_free(decompress_threads);
|
|
|
g_free(decomp_param);
|
|
|
decompress_threads = NULL;
|
|
|
decomp_param = NULL;
|
|
|
+ decomp_file = NULL;
|
|
|
+}
|
|
|
+
|
|
|
+static int compress_threads_load_setup(QEMUFile *f)
|
|
|
+{
|
|
|
+ int i, thread_count;
|
|
|
+
|
|
|
+ if (!migrate_use_compression()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ thread_count = migrate_decompress_threads();
|
|
|
+ decompress_threads = g_new0(QemuThread, thread_count);
|
|
|
+ decomp_param = g_new0(DecompressParam, thread_count);
|
|
|
+ qemu_mutex_init(&decomp_done_lock);
|
|
|
+ qemu_cond_init(&decomp_done_cond);
|
|
|
+ decomp_file = f;
|
|
|
+ for (i = 0; i < thread_count; i++) {
|
|
|
+ if (inflateInit(&decomp_param[i].stream) != Z_OK) {
|
|
|
+ goto exit;
|
|
|
+ }
|
|
|
+
|
|
|
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
|
|
|
+ qemu_mutex_init(&decomp_param[i].mutex);
|
|
|
+ qemu_cond_init(&decomp_param[i].cond);
|
|
|
+ decomp_param[i].done = true;
|
|
|
+ decomp_param[i].quit = false;
|
|
|
+ qemu_thread_create(decompress_threads + i, "decompress",
|
|
|
+ do_data_decompress, decomp_param + i,
|
|
|
+ QEMU_THREAD_JOINABLE);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+exit:
|
|
|
+ compress_threads_load_cleanup();
|
|
|
+ return -1;
|
|
|
}
|
|
|
|
|
|
static void decompress_data_with_multi_threads(QEMUFile *f,
|
|
@@ -2647,8 +2734,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
|
|
|
*/
|
|
|
static int ram_load_setup(QEMUFile *f, void *opaque)
|
|
|
{
|
|
|
+ if (compress_threads_load_setup(f)) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
xbzrle_load_setup();
|
|
|
- compress_threads_load_setup();
|
|
|
ramblock_recv_map_init();
|
|
|
return 0;
|
|
|
}
|
|
@@ -2999,7 +3089,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- wait_for_decompress_done();
|
|
|
+ ret |= wait_for_decompress_done();
|
|
|
rcu_read_unlock();
|
|
|
trace_ram_load_complete(ret, seq_iter);
|
|
|
return ret;
|