Przeglądaj źródła

Merge tag 'for-upstream' of https://repo.or.cz/qemu/kevin into staging

Block layer patches

- virtio-scsi: add iothread-vq-mapping parameter
- Improve writethrough performance
- Fix missing zero init in bdrv_snapshot_goto()
- Added scripts/qcow2-to-stdout.py
- Code cleanup and iotests fixes

# -----BEGIN PGP SIGNATURE-----
#
# iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmfTDysRHGt3b2xmQHJl
# ZGhhdC5jb20ACgkQfwmycsiPL9Yz6A//asOl37zjbtf9pYjY/gliH859TQOppPGD
# LB9IIr+nTDME0wfUkCOlag+CeEYZwkeo2PF+XeopsyzlJeBOk4tL7AkY57XYe3lZ
# M5hlnNrn6l3gb6iioMg60pEKSMrpKprB16vT3nAtyN6aEXsm9TvtPkWPFTCFGVeK
# W74VCr7wuXbfdEJcOGd8WhB9ZHIgwoWYnoL41tvCoefW2yNaMA6X0TLn98toXzOi
# il50ZnnchTQngns5R+n+1R1Ma995t393D+CArQcYVRzxKGOs5p0y4otz4gCkMhdp
# GVL09R7Ge4TteSJ2myxlN/EjYOxmdoMrVDajr4xPdHBw12MKzgk8i82h4/Es/Q5o
# 3Npgx74+jDyqlICb/czTVM5KJINpyO80vO3N3WpYUOQGyTCcYgv7pIpy8pB2o6Te
# RPlv0W9bHVSSgThFFLQ0Ud8WRGJe1K/ar8bdmiWN08Wez1avENWaYmsv5zGnFL24
# vD6cNXMR4mF7mzyeWda/5hGKv75djVgX+ZfzvWNT3qgizD56JBOA3RdCRwBZJOJb
# TvJkfi5RGyaji9BfKVCYBL3/iDELJEVDW8jxvIIUrS0aPcTHpAQ5gTO7VAokreqZ
# 5Smll11eeoEgPPvNLw8ikmOGTWOMkJGrmExP2K1ApANq3kSbBSU4jroEr0BG9PZT
# 6Y0hUdtFSdU=
# =w2Ri
# -----END PGP SIGNATURE-----
# gpg: Signature made Fri 14 Mar 2025 01:00:27 HKT
# gpg:                using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6
# gpg:                issuer "kwolf@redhat.com"
# gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full]
# Primary key fingerprint: DC3D EB15 9A9A F95D 3D74  56FE 7F09 B272 C88F 2FD6

* tag 'for-upstream' of https://repo.or.cz/qemu/kevin: (23 commits)
  scripts/qcow2-to-stdout.py: Add script to write qcow2 images to stdout
  virtio-scsi: only expose cmd vqs via iothread-vq-mapping
  virtio-scsi: handle ctrl virtqueue in main loop
  virtio-scsi: add iothread-vq-mapping parameter
  virtio: extract iothread-vq-mapping.h API
  virtio-blk: tidy up iothread_vq_mapping functions
  virtio-blk: extract cleanup_iothread_vq_mapping() function
  virtio-scsi: perform TMFs in appropriate AioContexts
  virtio-scsi: protect events_dropped field
  virtio-scsi: introduce event and ctrl virtqueue locks
  scsi: introduce requests_lock
  scsi: track per-SCSIRequest AioContext
  dma: use current AioContext for dma_blk_io()
  scsi-disk: drop unused SCSIDiskState->bh field
  iotests: Limit qsd-migrate to working formats
  aio-posix: Adjust polling time also for new handlers
  aio-posix: Separate AioPolledEvent per AioHandler
  aio-posix: Factor out adjust_polling_time()
  aio: Create AioPolledEvent
  block/io: Ignore FUA with cache.no-flush=on
  ...

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Stefan Hajnoczi 5 miesięcy temu
rodzic
commit
0462a32b4f

+ 0 - 12
block/block-backend.c

@@ -2357,18 +2357,6 @@ void *blk_blockalign(BlockBackend *blk, size_t size)
     return qemu_blockalign(blk ? blk_bs(blk) : NULL, size);
 }
 
-bool blk_op_is_blocked(BlockBackend *blk, BlockOpType op, Error **errp)
-{
-    BlockDriverState *bs = blk_bs(blk);
-    GLOBAL_STATE_CODE();
-    GRAPH_RDLOCK_GUARD_MAINLOOP();
-
-    if (!bs) {
-        return false;
-    }
-
-    return bdrv_op_is_blocked(bs, op, errp);
-}
 
 /**
  * Return BB's current AioContext.  Note that this context may change

+ 21 - 8
block/file-posix.c

@@ -194,6 +194,7 @@ static int fd_open(BlockDriverState *bs)
 }
 
 static int64_t raw_getlength(BlockDriverState *bs);
+static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs);
 
 typedef struct RawPosixAIOData {
     BlockDriverState *bs;
@@ -804,6 +805,13 @@ static int raw_open_common(BlockDriverState *bs, QDict *options,
 #endif
     s->needs_alignment = raw_needs_alignment(bs);
 
+    bs->supported_write_flags = BDRV_REQ_FUA;
+    if (s->use_linux_aio && !laio_has_fua()) {
+        bs->supported_write_flags &= ~BDRV_REQ_FUA;
+    } else if (s->use_linux_io_uring && !luring_has_fua()) {
+        bs->supported_write_flags &= ~BDRV_REQ_FUA;
+    }
+
     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
     if (S_ISREG(st.st_mode)) {
         /* When extending regular files, we get zeros from the OS */
@@ -2477,7 +2485,8 @@ static inline bool raw_check_linux_aio(BDRVRawState *s)
 #endif
 
 static int coroutine_fn raw_co_prw(BlockDriverState *bs, int64_t *offset_ptr,
-                                   uint64_t bytes, QEMUIOVector *qiov, int type)
+                                   uint64_t bytes, QEMUIOVector *qiov, int type,
+                                   int flags)
 {
     BDRVRawState *s = bs->opaque;
     RawPosixAIOData acb;
@@ -2508,13 +2517,13 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, int64_t *offset_ptr,
 #ifdef CONFIG_LINUX_IO_URING
     } else if (raw_check_linux_io_uring(s)) {
         assert(qiov->size == bytes);
-        ret = luring_co_submit(bs, s->fd, offset, qiov, type);
+        ret = luring_co_submit(bs, s->fd, offset, qiov, type, flags);
         goto out;
 #endif
 #ifdef CONFIG_LINUX_AIO
     } else if (raw_check_linux_aio(s)) {
         assert(qiov->size == bytes);
-        ret = laio_co_submit(s->fd, offset, qiov, type,
+        ret = laio_co_submit(s->fd, offset, qiov, type, flags,
                               s->aio_max_batch);
         goto out;
 #endif
@@ -2534,6 +2543,10 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, int64_t *offset_ptr,
 
     assert(qiov->size == bytes);
     ret = raw_thread_pool_submit(handle_aiocb_rw, &acb);
+    if (ret == 0 && (flags & BDRV_REQ_FUA)) {
+        /* TODO Use pwritev2() instead if it's available */
+        ret = raw_co_flush_to_disk(bs);
+    }
     goto out; /* Avoid the compiler err of unused label */
 
 out:
@@ -2571,14 +2584,14 @@ static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
                                       int64_t bytes, QEMUIOVector *qiov,
                                       BdrvRequestFlags flags)
 {
-    return raw_co_prw(bs, &offset, bytes, qiov, QEMU_AIO_READ);
+    return raw_co_prw(bs, &offset, bytes, qiov, QEMU_AIO_READ, flags);
 }
 
 static int coroutine_fn raw_co_pwritev(BlockDriverState *bs, int64_t offset,
                                        int64_t bytes, QEMUIOVector *qiov,
                                        BdrvRequestFlags flags)
 {
-    return raw_co_prw(bs, &offset, bytes, qiov, QEMU_AIO_WRITE);
+    return raw_co_prw(bs, &offset, bytes, qiov, QEMU_AIO_WRITE, flags);
 }
 
 static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
@@ -2600,12 +2613,12 @@ static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
 
 #ifdef CONFIG_LINUX_IO_URING
     if (raw_check_linux_io_uring(s)) {
-        return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
+        return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH, 0);
     }
 #endif
 #ifdef CONFIG_LINUX_AIO
     if (s->has_laio_fdsync && raw_check_linux_aio(s)) {
-        return laio_co_submit(s->fd, 0, NULL, QEMU_AIO_FLUSH, 0);
+        return laio_co_submit(s->fd, 0, NULL, QEMU_AIO_FLUSH, 0, 0);
     }
 #endif
     return raw_thread_pool_submit(handle_aiocb_flush, &acb);
@@ -3540,7 +3553,7 @@ static int coroutine_fn raw_co_zone_append(BlockDriverState *bs,
     }
 
     trace_zbd_zone_append(bs, *offset >> BDRV_SECTOR_BITS);
-    return raw_co_prw(bs, offset, len, qiov, QEMU_AIO_ZONE_APPEND);
+    return raw_co_prw(bs, offset, len, qiov, QEMU_AIO_ZONE_APPEND, 0);
 }
 #endif
 

+ 4 - 0
block/io.c

@@ -1058,6 +1058,10 @@ bdrv_driver_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
         return -ENOMEDIUM;
     }
 
+    if (bs->open_flags & BDRV_O_NO_FLUSH) {
+        flags &= ~BDRV_REQ_FUA;
+    }
+
     if ((flags & BDRV_REQ_FUA) &&
         (~bs->supported_write_flags & BDRV_REQ_FUA)) {
         flags &= ~BDRV_REQ_FUA;

+ 22 - 3
block/io_uring.c

@@ -335,15 +335,24 @@ static void luring_deferred_fn(void *opaque)
  *
  */
 static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
-                            uint64_t offset, int type)
+                            uint64_t offset, int type, BdrvRequestFlags flags)
 {
     int ret;
     struct io_uring_sqe *sqes = &luringcb->sqeq;
 
     switch (type) {
     case QEMU_AIO_WRITE:
+#ifdef HAVE_IO_URING_PREP_WRITEV2
+    {
+        int luring_flags = (flags & BDRV_REQ_FUA) ? RWF_DSYNC : 0;
+        io_uring_prep_writev2(sqes, fd, luringcb->qiov->iov,
+                              luringcb->qiov->niov, offset, luring_flags);
+    }
+#else
+        assert(flags == 0);
         io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
                              luringcb->qiov->niov, offset);
+#endif
         break;
     case QEMU_AIO_ZONE_APPEND:
         io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
@@ -380,7 +389,8 @@ static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
 }
 
 int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
-                                  QEMUIOVector *qiov, int type)
+                                  QEMUIOVector *qiov, int type,
+                                  BdrvRequestFlags flags)
 {
     int ret;
     AioContext *ctx = qemu_get_current_aio_context();
@@ -393,7 +403,7 @@ int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
     };
     trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
                            type);
-    ret = luring_do_submit(fd, &luringcb, s, offset, type);
+    ret = luring_do_submit(fd, &luringcb, s, offset, type, flags);
 
     if (ret < 0) {
         return ret;
@@ -448,3 +458,12 @@ void luring_cleanup(LuringState *s)
     trace_luring_cleanup_state(s);
     g_free(s);
 }
+
+bool luring_has_fua(void)
+{
+#ifdef HAVE_IO_URING_PREP_WRITEV2
+    return true;
+#else
+    return false;
+#endif
+}

+ 22 - 3
block/linux-aio.c

@@ -368,7 +368,8 @@ static void laio_deferred_fn(void *opaque)
 }
 
 static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
-                          int type, uint64_t dev_max_batch)
+                          int type, BdrvRequestFlags flags,
+                          uint64_t dev_max_batch)
 {
     LinuxAioState *s = laiocb->ctx;
     struct iocb *iocbs = &laiocb->iocb;
@@ -376,7 +377,15 @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
 
     switch (type) {
     case QEMU_AIO_WRITE:
+#ifdef HAVE_IO_PREP_PWRITEV2
+    {
+        int laio_flags = (flags & BDRV_REQ_FUA) ? RWF_DSYNC : 0;
+        io_prep_pwritev2(iocbs, fd, qiov->iov, qiov->niov, offset, laio_flags);
+    }
+#else
+        assert(flags == 0);
         io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
+#endif
         break;
     case QEMU_AIO_ZONE_APPEND:
         io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
@@ -409,7 +418,8 @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
 }
 
 int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
-                                int type, uint64_t dev_max_batch)
+                                int type, BdrvRequestFlags flags,
+                                uint64_t dev_max_batch)
 {
     int ret;
     AioContext *ctx = qemu_get_current_aio_context();
@@ -422,7 +432,7 @@ int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
         .qiov       = qiov,
     };
 
-    ret = laio_do_submit(fd, &laiocb, offset, type, dev_max_batch);
+    ret = laio_do_submit(fd, &laiocb, offset, type, flags, dev_max_batch);
     if (ret < 0) {
         return ret;
     }
@@ -505,3 +515,12 @@ bool laio_has_fdsync(int fd)
     io_destroy(ctx);
     return (ret == -EINVAL) ? false : true;
 }
+
+bool laio_has_fua(void)
+{
+#ifdef HAVE_IO_PREP_PWRITEV2
+    return true;
+#else
+    return false;
+#endif
+}

+ 1 - 0
block/snapshot.c

@@ -296,6 +296,7 @@ int bdrv_snapshot_goto(BlockDriverState *bs,
         bdrv_graph_wrunlock();
 
         ret = bdrv_snapshot_goto(fallback_bs, snapshot_id, errp);
+        memset(bs->opaque, 0, drv->instance_size);
         open_ret = drv->bdrv_open(bs, options, bs->open_flags, &local_err);
         qobject_unref(options);
         if (open_ret < 0) {

+ 3 - 129
hw/block/virtio-blk.c

@@ -33,6 +33,7 @@
 #endif
 #include "hw/virtio/virtio-bus.h"
 #include "migration/qemu-file-types.h"
+#include "hw/virtio/iothread-vq-mapping.h"
 #include "hw/virtio/virtio-access.h"
 #include "hw/virtio/virtio-blk-common.h"
 #include "qemu/coroutine.h"
@@ -1423,128 +1424,6 @@ static const BlockDevOps virtio_block_ops = {
     .drained_end   = virtio_blk_drained_end,
 };
 
-static bool
-validate_iothread_vq_mapping_list(IOThreadVirtQueueMappingList *list,
-        uint16_t num_queues, Error **errp)
-{
-    g_autofree unsigned long *vqs = bitmap_new(num_queues);
-    g_autoptr(GHashTable) iothreads =
-        g_hash_table_new(g_str_hash, g_str_equal);
-
-    for (IOThreadVirtQueueMappingList *node = list; node; node = node->next) {
-        const char *name = node->value->iothread;
-        uint16List *vq;
-
-        if (!iothread_by_id(name)) {
-            error_setg(errp, "IOThread \"%s\" object does not exist", name);
-            return false;
-        }
-
-        if (!g_hash_table_add(iothreads, (gpointer)name)) {
-            error_setg(errp,
-                    "duplicate IOThread name \"%s\" in iothread-vq-mapping",
-                    name);
-            return false;
-        }
-
-        if (node != list) {
-            if (!!node->value->vqs != !!list->value->vqs) {
-                error_setg(errp, "either all items in iothread-vq-mapping "
-                                 "must have vqs or none of them must have it");
-                return false;
-            }
-        }
-
-        for (vq = node->value->vqs; vq; vq = vq->next) {
-            if (vq->value >= num_queues) {
-                error_setg(errp, "vq index %u for IOThread \"%s\" must be "
-                        "less than num_queues %u in iothread-vq-mapping",
-                        vq->value, name, num_queues);
-                return false;
-            }
-
-            if (test_and_set_bit(vq->value, vqs)) {
-                error_setg(errp, "cannot assign vq %u to IOThread \"%s\" "
-                        "because it is already assigned", vq->value, name);
-                return false;
-            }
-        }
-    }
-
-    if (list->value->vqs) {
-        for (uint16_t i = 0; i < num_queues; i++) {
-            if (!test_bit(i, vqs)) {
-                error_setg(errp,
-                        "missing vq %u IOThread assignment in iothread-vq-mapping",
-                        i);
-                return false;
-            }
-        }
-    }
-
-    return true;
-}
-
-/**
- * apply_iothread_vq_mapping:
- * @iothread_vq_mapping_list: The mapping of virtqueues to IOThreads.
- * @vq_aio_context: The array of AioContext pointers to fill in.
- * @num_queues: The length of @vq_aio_context.
- * @errp: If an error occurs, a pointer to the area to store the error.
- *
- * Fill in the AioContext for each virtqueue in the @vq_aio_context array given
- * the iothread-vq-mapping parameter in @iothread_vq_mapping_list.
- *
- * Returns: %true on success, %false on failure.
- **/
-static bool apply_iothread_vq_mapping(
-        IOThreadVirtQueueMappingList *iothread_vq_mapping_list,
-        AioContext **vq_aio_context,
-        uint16_t num_queues,
-        Error **errp)
-{
-    IOThreadVirtQueueMappingList *node;
-    size_t num_iothreads = 0;
-    size_t cur_iothread = 0;
-
-    if (!validate_iothread_vq_mapping_list(iothread_vq_mapping_list,
-                                           num_queues, errp)) {
-        return false;
-    }
-
-    for (node = iothread_vq_mapping_list; node; node = node->next) {
-        num_iothreads++;
-    }
-
-    for (node = iothread_vq_mapping_list; node; node = node->next) {
-        IOThread *iothread = iothread_by_id(node->value->iothread);
-        AioContext *ctx = iothread_get_aio_context(iothread);
-
-        /* Released in virtio_blk_vq_aio_context_cleanup() */
-        object_ref(OBJECT(iothread));
-
-        if (node->value->vqs) {
-            uint16List *vq;
-
-            /* Explicit vq:IOThread assignment */
-            for (vq = node->value->vqs; vq; vq = vq->next) {
-                assert(vq->value < num_queues);
-                vq_aio_context[vq->value] = ctx;
-            }
-        } else {
-            /* Round-robin vq:IOThread assignment */
-            for (unsigned i = cur_iothread; i < num_queues;
-                 i += num_iothreads) {
-                vq_aio_context[i] = ctx;
-            }
-        }
-
-        cur_iothread++;
-    }
-
-    return true;
-}
-
 /* Context: BQL held */
 static bool virtio_blk_vq_aio_context_init(VirtIOBlock *s, Error **errp)
 {
@@ -1577,7 +1456,7 @@ static bool virtio_blk_vq_aio_context_init(VirtIOBlock *s, Error **errp)
     s->vq_aio_context = g_new(AioContext *, conf->num_queues);
 
     if (conf->iothread_vq_mapping_list) {
-        if (!apply_iothread_vq_mapping(conf->iothread_vq_mapping_list,
+        if (!iothread_vq_mapping_apply(conf->iothread_vq_mapping_list,
                                        s->vq_aio_context,
                                        conf->num_queues,
                                        errp)) {
@@ -1611,12 +1490,7 @@ static void virtio_blk_vq_aio_context_cleanup(VirtIOBlock *s)
     assert(!s->ioeventfd_started);
 
     if (conf->iothread_vq_mapping_list) {
-        IOThreadVirtQueueMappingList *node;
-
-        for (node = conf->iothread_vq_mapping_list; node; node = node->next) {
-            IOThread *iothread = iothread_by_id(node->value->iothread);
-            object_unref(OBJECT(iothread));
-        }
+        iothread_vq_mapping_cleanup(conf->iothread_vq_mapping_list);
     }
 
     if (conf->iothread) {

+ 1 - 2
hw/ide/core.c

@@ -968,8 +968,7 @@ static void ide_dma_cb(void *opaque, int ret)
                                            BDRV_SECTOR_SIZE, ide_dma_cb, s);
         break;
     case IDE_DMA_TRIM:
-        s->bus->dma->aiocb = dma_blk_io(blk_get_aio_context(s->blk),
-                                        &s->sg, offset, BDRV_SECTOR_SIZE,
+        s->bus->dma->aiocb = dma_blk_io(&s->sg, offset, BDRV_SECTOR_SIZE,
                                         ide_issue_trim, s, ide_dma_cb, s,
                                         DMA_DIRECTION_TO_DEVICE);
         break;

+ 1 - 2
hw/ide/macio.c

@@ -187,8 +187,7 @@ static void pmac_ide_transfer_cb(void *opaque, int ret)
                                            pmac_ide_transfer_cb, io);
         break;
     case IDE_DMA_TRIM:
-        s->bus->dma->aiocb = dma_blk_io(blk_get_aio_context(s->blk), &s->sg,
-                                        offset, 0x1, ide_issue_trim, s,
+        s->bus->dma->aiocb = dma_blk_io(&s->sg, offset, 0x1, ide_issue_trim, s,
                                         pmac_ide_transfer_cb, io,
                                         DMA_DIRECTION_TO_DEVICE);
         break;

+ 86 - 35
hw/scsi/scsi-bus.c

@@ -100,8 +100,15 @@ static void scsi_device_for_each_req_sync(SCSIDevice *s,
     assert(!runstate_is_running());
     assert(qemu_in_main_thread());
 
-    QTAILQ_FOREACH_SAFE(req, &s->requests, next, next_req) {
-        fn(req, opaque);
+    /*
+     * Locking is not necessary because the guest is stopped and no other
+     * threads can be accessing the requests list, but take the lock for
+     * consistency.
+     */
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        QTAILQ_FOREACH_SAFE(req, &s->requests, next, next_req) {
+            fn(req, opaque);
+        }
     }
 }
 
@@ -115,21 +122,29 @@ static void scsi_device_for_each_req_async_bh(void *opaque)
 {
     g_autofree SCSIDeviceForEachReqAsyncData *data = opaque;
     SCSIDevice *s = data->s;
-    AioContext *ctx;
-    SCSIRequest *req;
-    SCSIRequest *next;
+    g_autoptr(GList) reqs = NULL;
 
     /*
-     * The BB cannot have changed contexts between this BH being scheduled and
-     * now: BBs' AioContexts, when they have a node attached, can only be
-     * changed via bdrv_try_change_aio_context(), in a drained section.  While
-     * we have the in-flight counter incremented, that drain must block.
+     * Build a list of requests in this AioContext so fn() can be invoked later
+     * outside requests_lock.
      */
-    ctx = blk_get_aio_context(s->conf.blk);
-    assert(ctx == qemu_get_current_aio_context());
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        AioContext *ctx = qemu_get_current_aio_context();
+        SCSIRequest *req;
+        SCSIRequest *next;
+
+        QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
+            if (req->ctx == ctx) {
+                scsi_req_ref(req); /* dropped after calling fn() */
+                reqs = g_list_prepend(reqs, req);
+            }
+        }
+    }
 
-    QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
-        data->fn(req, data->fn_opaque);
+    /* Call fn() on each request */
+    for (GList *elem = g_list_first(reqs); elem; elem = g_list_next(elem)) {
+        data->fn(elem->data, data->fn_opaque);
+        scsi_req_unref(elem->data);
     }
 
     /* Drop the reference taken by scsi_device_for_each_req_async() */
@@ -139,9 +154,35 @@ static void scsi_device_for_each_req_async_bh(void *opaque)
     blk_dec_in_flight(s->conf.blk);
 }
 
+static void scsi_device_for_each_req_async_do_ctx(gpointer key, gpointer value,
+                                                  gpointer user_data)
+{
+    AioContext *ctx = key;
+    SCSIDeviceForEachReqAsyncData *params = user_data;
+    SCSIDeviceForEachReqAsyncData *data;
+
+    data = g_new(SCSIDeviceForEachReqAsyncData, 1);
+    data->s = params->s;
+    data->fn = params->fn;
+    data->fn_opaque = params->fn_opaque;
+
+    /*
+     * Hold a reference to the SCSIDevice until
+     * scsi_device_for_each_req_async_bh() finishes.
+     */
+    object_ref(OBJECT(data->s));
+
+    /* Paired with scsi_device_for_each_req_async_bh() */
+    blk_inc_in_flight(data->s->conf.blk);
+
+    aio_bh_schedule_oneshot(ctx, scsi_device_for_each_req_async_bh, data);
+}
+
 /*
  * Schedule @fn() to be invoked for each enqueued request in device @s. @fn()
- * runs in the AioContext that is executing the request.
+ * must be thread-safe because it runs concurrently in each AioContext that is
+ * executing a request.
+ *
  * Keeps the BlockBackend's in-flight counter incremented until everything is
  * done, so draining it will settle all scheduled @fn() calls.
  */
@@ -151,24 +192,26 @@ static void scsi_device_for_each_req_async(SCSIDevice *s,
 {
     assert(qemu_in_main_thread());
 
-    SCSIDeviceForEachReqAsyncData *data =
-        g_new(SCSIDeviceForEachReqAsyncData, 1);
-
-    data->s = s;
-    data->fn = fn;
-    data->fn_opaque = opaque;
-
-    /*
-     * Hold a reference to the SCSIDevice until
-     * scsi_device_for_each_req_async_bh() finishes.
-     */
-    object_ref(OBJECT(s));
+    /* The set of AioContexts where the requests are being processed */
+    g_autoptr(GHashTable) aio_contexts = g_hash_table_new(NULL, NULL);
+    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+        SCSIRequest *req;
+        QTAILQ_FOREACH(req, &s->requests, next) {
+            g_hash_table_add(aio_contexts, req->ctx);
+        }
+    }
 
-    /* Paired with blk_dec_in_flight() in scsi_device_for_each_req_async_bh() */
-    blk_inc_in_flight(s->conf.blk);
-    aio_bh_schedule_oneshot(blk_get_aio_context(s->conf.blk),
-                            scsi_device_for_each_req_async_bh,
-                            data);
+    /* Schedule a BH for each AioContext */
+    SCSIDeviceForEachReqAsyncData params = {
+        .s = s,
+        .fn = fn,
+        .fn_opaque = opaque,
+    };
+    g_hash_table_foreach(
+            aio_contexts,
+            scsi_device_for_each_req_async_do_ctx,
+            &params
+    );
 }
 
 static void scsi_device_realize(SCSIDevice *s, Error **errp)
@@ -349,6 +392,7 @@ static void scsi_qdev_realize(DeviceState *qdev, Error **errp)
         dev->lun = lun;
     }
 
+    qemu_mutex_init(&dev->requests_lock);
     QTAILQ_INIT(&dev->requests);
     scsi_device_realize(dev, &local_err);
     if (local_err) {
@@ -369,6 +413,8 @@ static void scsi_qdev_unrealize(DeviceState *qdev)
 
     scsi_device_purge_requests(dev, SENSE_CODE(NO_SENSE));
 
+    qemu_mutex_destroy(&dev->requests_lock);
+
     scsi_device_unrealize(dev);
 
     blockdev_mark_auto_del(dev->conf.blk);
@@ -868,6 +914,7 @@ invalid_opcode:
         }
     }
 
+    req->ctx = qemu_get_current_aio_context();
     req->cmd = cmd;
     req->residual = req->cmd.xfer;
 
@@ -964,7 +1011,10 @@ static void scsi_req_enqueue_internal(SCSIRequest *req)
         req->sg = NULL;
     }
     req->enqueued = true;
-    QTAILQ_INSERT_TAIL(&req->dev->requests, req, next);
+
+    WITH_QEMU_LOCK_GUARD(&req->dev->requests_lock) {
+        QTAILQ_INSERT_TAIL(&req->dev->requests, req, next);
+    }
 }
 
 int32_t scsi_req_enqueue(SCSIRequest *req)
@@ -984,7 +1034,9 @@ static void scsi_req_dequeue(SCSIRequest *req)
     trace_scsi_req_dequeue(req->dev->id, req->lun, req->tag);
     req->retry = false;
     if (req->enqueued) {
-        QTAILQ_REMOVE(&req->dev->requests, req, next);
+        WITH_QEMU_LOCK_GUARD(&req->dev->requests_lock) {
+            QTAILQ_REMOVE(&req->dev->requests, req, next);
+        }
         req->enqueued = false;
         scsi_req_unref(req);
     }
@@ -1961,8 +2013,7 @@ static void scsi_device_class_init(ObjectClass *klass, void *data)
 
 static void scsi_dev_instance_init(Object *obj)
 {
-    DeviceState *dev = DEVICE(obj);
-    SCSIDevice *s = SCSI_DEVICE(dev);
+    SCSIDevice *s = SCSI_DEVICE(obj);
 
     device_add_bootindex_property(obj, &s->conf.bootindex,
                                   "bootindex", NULL,

+ 8 - 16
hw/scsi/scsi-disk.c

@@ -106,7 +106,6 @@ struct SCSIDiskState {
     uint64_t max_unmap_size;
     uint64_t max_io_size;
     uint32_t quirks;
-    QEMUBH *bh;
     char *version;
     char *serial;
     char *vendor;
@@ -329,9 +328,8 @@ static void scsi_aio_complete(void *opaque, int ret)
     SCSIDiskReq *r = (SCSIDiskReq *)opaque;
     SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
 
-    /* The request must only run in the BlockBackend's AioContext */
-    assert(blk_get_aio_context(s->qdev.conf.blk) ==
-           qemu_get_current_aio_context());
+    /* The request must run in its AioContext */
+    assert(r->req.ctx == qemu_get_current_aio_context());
 
     assert(r->req.aiocb != NULL);
     r->req.aiocb = NULL;
@@ -431,12 +429,10 @@ static void scsi_dma_complete(void *opaque, int ret)
 
 static void scsi_read_complete_noio(SCSIDiskReq *r, int ret)
 {
-    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
     uint32_t n;
 
-    /* The request must only run in the BlockBackend's AioContext */
-    assert(blk_get_aio_context(s->qdev.conf.blk) ==
-           qemu_get_current_aio_context());
+    /* The request must run in its AioContext */
+    assert(r->req.ctx == qemu_get_current_aio_context());
 
     assert(r->req.aiocb == NULL);
     if (scsi_disk_req_check_error(r, ret, ret > 0)) {
@@ -488,8 +484,7 @@ static void scsi_do_read(SCSIDiskReq *r, int ret)
     if (r->req.sg) {
         dma_acct_start(s->qdev.conf.blk, &r->acct, r->req.sg, BLOCK_ACCT_READ);
         r->req.residual -= r->req.sg->size;
-        r->req.aiocb = dma_blk_io(blk_get_aio_context(s->qdev.conf.blk),
-                                  r->req.sg, r->sector << BDRV_SECTOR_BITS,
+        r->req.aiocb = dma_blk_io(r->req.sg, r->sector << BDRV_SECTOR_BITS,
                                   BDRV_SECTOR_SIZE,
                                   sdc->dma_readv, r, scsi_dma_complete, r,
                                   DMA_DIRECTION_FROM_DEVICE);
@@ -564,12 +559,10 @@ static void scsi_read_data(SCSIRequest *req)
 
 static void scsi_write_complete_noio(SCSIDiskReq *r, int ret)
 {
-    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
     uint32_t n;
 
-    /* The request must only run in the BlockBackend's AioContext */
-    assert(blk_get_aio_context(s->qdev.conf.blk) ==
-           qemu_get_current_aio_context());
+    /* The request must run in its AioContext */
+    assert(r->req.ctx == qemu_get_current_aio_context());
 
     assert (r->req.aiocb == NULL);
     if (scsi_disk_req_check_error(r, ret, ret > 0)) {
@@ -651,8 +644,7 @@ static void scsi_write_data(SCSIRequest *req)
     if (r->req.sg) {
         dma_acct_start(s->qdev.conf.blk, &r->acct, r->req.sg, BLOCK_ACCT_WRITE);
         r->req.residual -= r->req.sg->size;
-        r->req.aiocb = dma_blk_io(blk_get_aio_context(s->qdev.conf.blk),
-                                  r->req.sg, r->sector << BDRV_SECTOR_BITS,
+        r->req.aiocb = dma_blk_io(r->req.sg, r->sector << BDRV_SECTOR_BITS,
                                   BDRV_SECTOR_SIZE,
                                   sdc->dma_writev, r, scsi_dma_complete, r,
                                   DMA_DIRECTION_TO_DEVICE);

+ 78 - 25
hw/scsi/virtio-scsi-dataplane.c

@@ -18,6 +18,7 @@
 #include "system/block-backend.h"
 #include "hw/scsi/scsi.h"
 #include "scsi/constants.h"
+#include "hw/virtio/iothread-vq-mapping.h"
 #include "hw/virtio/virtio-bus.h"
 
 /* Context: BQL held */
@@ -28,7 +29,14 @@ void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp)
     BusState *qbus = qdev_get_parent_bus(DEVICE(vdev));
     VirtioBusClass *k = VIRTIO_BUS_GET_CLASS(qbus);
 
-    if (vs->conf.iothread) {
+    if (vs->conf.iothread && vs->conf.iothread_vq_mapping_list) {
+        error_setg(errp,
+                   "iothread and iothread-vq-mapping properties cannot be set "
+                   "at the same time");
+        return;
+    }
+
+    if (vs->conf.iothread || vs->conf.iothread_vq_mapping_list) {
         if (!k->set_guest_notifiers || !k->ioeventfd_assign) {
             error_setg(errp,
                        "device is incompatible with iothread "
@@ -39,15 +47,64 @@ void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp)
             error_setg(errp, "ioeventfd is required for iothread");
             return;
         }
-        s->ctx = iothread_get_aio_context(vs->conf.iothread);
-    } else {
-        if (!virtio_device_ioeventfd_enabled(vdev)) {
+    }
+
+    s->vq_aio_context = g_new(AioContext *, vs->conf.num_queues +
+                                            VIRTIO_SCSI_VQ_NUM_FIXED);
+
+    /*
+     * Handle the ctrl virtqueue in the main loop thread where device resets
+     * can be performed.
+     */
+    s->vq_aio_context[0] = qemu_get_aio_context();
+
+    /*
+     * Handle the event virtqueue in the main loop thread where its no_poll
+     * behavior won't stop IOThread polling.
+     */
+    s->vq_aio_context[1] = qemu_get_aio_context();
+
+    if (vs->conf.iothread_vq_mapping_list) {
+        if (!iothread_vq_mapping_apply(vs->conf.iothread_vq_mapping_list,
+                    &s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED],
+                    vs->conf.num_queues, errp)) {
+            g_free(s->vq_aio_context);
+            s->vq_aio_context = NULL;
             return;
         }
-        s->ctx = qemu_get_aio_context();
+    } else if (vs->conf.iothread) {
+        AioContext *ctx = iothread_get_aio_context(vs->conf.iothread);
+        for (uint16_t i = 0; i < vs->conf.num_queues; i++) {
+            s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED + i] = ctx;
+        }
+
+        /* Released in virtio_scsi_dataplane_cleanup() */
+        object_ref(OBJECT(vs->conf.iothread));
+    } else {
+        AioContext *ctx = qemu_get_aio_context();
+        for (unsigned i = 0; i < vs->conf.num_queues; i++) {
+            s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED + i] = ctx;
+        }
     }
 }
 
+/* Context: BQL held */
+void virtio_scsi_dataplane_cleanup(VirtIOSCSI *s)
+{
+    VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
+
+    if (vs->conf.iothread_vq_mapping_list) {
+        iothread_vq_mapping_cleanup(vs->conf.iothread_vq_mapping_list);
+    }
+
+    if (vs->conf.iothread) {
+        object_unref(OBJECT(vs->conf.iothread));
+    }
+
+    g_free(s->vq_aio_context);
+    s->vq_aio_context = NULL;
+}
+
 static int virtio_scsi_set_host_notifier(VirtIOSCSI *s, VirtQueue *vq, int n)
 {
     BusState *qbus = BUS(qdev_get_parent_bus(DEVICE(s)));
@@ -66,31 +123,20 @@ static int virtio_scsi_set_host_notifier(VirtIOSCSI *s, VirtQueue *vq, int n)
 }
 
 /* Context: BH in IOThread */
-static void virtio_scsi_dataplane_stop_bh(void *opaque)
+static void virtio_scsi_dataplane_stop_vq_bh(void *opaque)
 {
-    VirtIOSCSI *s = opaque;
-    VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
+    AioContext *ctx = qemu_get_current_aio_context();
+    VirtQueue *vq = opaque;
     EventNotifier *host_notifier;
-    int i;
 
-    virtio_queue_aio_detach_host_notifier(vs->ctrl_vq, s->ctx);
-    host_notifier = virtio_queue_get_host_notifier(vs->ctrl_vq);
+    virtio_queue_aio_detach_host_notifier(vq, ctx);
+    host_notifier = virtio_queue_get_host_notifier(vq);
 
     /*
      * Test and clear notifier after disabling event, in case poll callback
      * didn't have time to run.
      */
     virtio_queue_host_notifier_read(host_notifier);
-
-    virtio_queue_aio_detach_host_notifier(vs->event_vq, s->ctx);
-    host_notifier = virtio_queue_get_host_notifier(vs->event_vq);
-    virtio_queue_host_notifier_read(host_notifier);
-
-    for (i = 0; i < vs->conf.num_queues; i++) {
-        virtio_queue_aio_detach_host_notifier(vs->cmd_vqs[i], s->ctx);
-        host_notifier = virtio_queue_get_host_notifier(vs->cmd_vqs[i]);
-        virtio_queue_host_notifier_read(host_notifier);
-    }
 }
 
 /* Context: BQL held */
@@ -154,11 +200,14 @@ int virtio_scsi_dataplane_start(VirtIODevice *vdev)
     smp_wmb(); /* paired with aio_notify_accept() */
 
     if (s->bus.drain_count == 0) {
-        virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx);
-        virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx);
+        virtio_queue_aio_attach_host_notifier(vs->ctrl_vq,
+                                              s->vq_aio_context[0]);
+        virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq,
+                                                      s->vq_aio_context[1]);
 
         for (i = 0; i < vs->conf.num_queues; i++) {
-            virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx);
+            AioContext *ctx = s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED + i];
+            virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], ctx);
         }
     }
     return 0;
@@ -207,7 +256,11 @@ void virtio_scsi_dataplane_stop(VirtIODevice *vdev)
     s->dataplane_stopping = true;
 
     if (s->bus.drain_count == 0) {
-        aio_wait_bh_oneshot(s->ctx, virtio_scsi_dataplane_stop_bh, s);
+        for (i = 0; i < vs->conf.num_queues + VIRTIO_SCSI_VQ_NUM_FIXED; i++) {
+            VirtQueue *vq = virtio_get_queue(&vs->parent_obj, i);
+            AioContext *ctx = s->vq_aio_context[i];
+            aio_wait_bh_oneshot(ctx, virtio_scsi_dataplane_stop_vq_bh, vq);
+        }
     }
 
     blk_drain_all(); /* ensure there are no in-flight requests */

+ 293 - 209
hw/scsi/virtio-scsi.c

@@ -27,6 +27,7 @@
 #include "hw/qdev-properties.h"
 #include "hw/scsi/scsi.h"
 #include "scsi/constants.h"
+#include "hw/virtio/iothread-vq-mapping.h"
 #include "hw/virtio/virtio-bus.h"
 #include "hw/virtio/virtio-access.h"
 #include "trace.h"
@@ -47,7 +48,7 @@ typedef struct VirtIOSCSIReq {
     /* Used for two-stage request submission and TMFs deferred to BH */
     QTAILQ_ENTRY(VirtIOSCSIReq) next;
 
-    /* Used for cancellation of request during TMFs */
+    /* Used for cancellation of request during TMFs. Atomic. */
     int remaining;
 
     SCSIRequest *sreq;
@@ -102,13 +103,18 @@ static void virtio_scsi_free_req(VirtIOSCSIReq *req)
     g_free(req);
 }
 
-static void virtio_scsi_complete_req(VirtIOSCSIReq *req)
+static void virtio_scsi_complete_req(VirtIOSCSIReq *req, QemuMutex *vq_lock)
 {
     VirtIOSCSI *s = req->dev;
     VirtQueue *vq = req->vq;
     VirtIODevice *vdev = VIRTIO_DEVICE(s);
 
     qemu_iovec_from_buf(&req->resp_iov, 0, &req->resp, req->resp_size);
+
+    if (vq_lock) {
+        qemu_mutex_lock(vq_lock);
+    }
+
     virtqueue_push(vq, &req->elem, req->qsgl.size + req->resp_iov.size);
     if (s->dataplane_started && !s->dataplane_fenced) {
         virtio_notify_irqfd(vdev, vq);
@@ -116,6 +122,10 @@ static void virtio_scsi_complete_req(VirtIOSCSIReq *req)
         virtio_notify(vdev, vq);
     }
 
+    if (vq_lock) {
+        qemu_mutex_unlock(vq_lock);
+    }
+
     if (req->sreq) {
         req->sreq->hba_private = NULL;
         scsi_req_unref(req->sreq);
@@ -123,34 +133,20 @@ static void virtio_scsi_complete_req(VirtIOSCSIReq *req)
     virtio_scsi_free_req(req);
 }
 
-static void virtio_scsi_complete_req_bh(void *opaque)
+static void virtio_scsi_bad_req(VirtIOSCSIReq *req, QemuMutex *vq_lock)
 {
-    VirtIOSCSIReq *req = opaque;
+    virtio_error(VIRTIO_DEVICE(req->dev), "wrong size for virtio-scsi headers");
 
-    virtio_scsi_complete_req(req);
-}
+    if (vq_lock) {
+        qemu_mutex_lock(vq_lock);
+    }
 
-/*
- * Called from virtio_scsi_do_one_tmf_bh() in main loop thread. The main loop
- * thread cannot touch the virtqueue since that could race with an IOThread.
- */
-static void virtio_scsi_complete_req_from_main_loop(VirtIOSCSIReq *req)
-{
-    VirtIOSCSI *s = req->dev;
+    virtqueue_detach_element(req->vq, &req->elem, 0);
 
-    if (!s->ctx || s->ctx == qemu_get_aio_context()) {
-        /* No need to schedule a BH when there is no IOThread */
-        virtio_scsi_complete_req(req);
-    } else {
-        /* Run request completion in the IOThread */
-        aio_wait_bh_oneshot(s->ctx, virtio_scsi_complete_req_bh, req);
+    if (vq_lock) {
+        qemu_mutex_unlock(vq_lock);
     }
-}
 
-static void virtio_scsi_bad_req(VirtIOSCSIReq *req)
-{
-    virtio_error(VIRTIO_DEVICE(req->dev), "wrong size for virtio-scsi headers");
-    virtqueue_detach_element(req->vq, &req->elem, 0);
     virtio_scsi_free_req(req);
 }
 
@@ -235,12 +231,21 @@ static int virtio_scsi_parse_req(VirtIOSCSIReq *req,
     return 0;
 }
 
-static VirtIOSCSIReq *virtio_scsi_pop_req(VirtIOSCSI *s, VirtQueue *vq)
+static VirtIOSCSIReq *virtio_scsi_pop_req(VirtIOSCSI *s, VirtQueue *vq, QemuMutex *vq_lock)
 {
     VirtIOSCSICommon *vs = (VirtIOSCSICommon *)s;
     VirtIOSCSIReq *req;
 
+    if (vq_lock) {
+        qemu_mutex_lock(vq_lock);
+    }
+
     req = virtqueue_pop(vq, sizeof(VirtIOSCSIReq) + vs->cdb_size);
+
+    if (vq_lock) {
+        qemu_mutex_unlock(vq_lock);
+    }
+
     if (!req) {
         return NULL;
     }
@@ -294,136 +299,157 @@ typedef struct {
     VirtIOSCSIReq  *tmf_req;
 } VirtIOSCSICancelNotifier;
 
+static void virtio_scsi_tmf_dec_remaining(VirtIOSCSIReq *tmf)
+{
+    if (qatomic_fetch_dec(&tmf->remaining) == 1) {
+        trace_virtio_scsi_tmf_resp(virtio_scsi_get_lun(tmf->req.tmf.lun),
+                                   tmf->req.tmf.tag, tmf->resp.tmf.response);
+
+        virtio_scsi_complete_req(tmf, &tmf->dev->ctrl_lock);
+    }
+}
+
 static void virtio_scsi_cancel_notify(Notifier *notifier, void *data)
 {
     VirtIOSCSICancelNotifier *n = container_of(notifier,
                                                VirtIOSCSICancelNotifier,
                                                notifier);
 
-    if (--n->tmf_req->remaining == 0) {
-        VirtIOSCSIReq *req = n->tmf_req;
-
-        trace_virtio_scsi_tmf_resp(virtio_scsi_get_lun(req->req.tmf.lun),
-                                   req->req.tmf.tag, req->resp.tmf.response);
-        virtio_scsi_complete_req(req);
-    }
+    virtio_scsi_tmf_dec_remaining(n->tmf_req);
     g_free(n);
 }
 
-static inline void virtio_scsi_ctx_check(VirtIOSCSI *s, SCSIDevice *d)
+static void virtio_scsi_tmf_cancel_req(VirtIOSCSIReq *tmf, SCSIRequest *r)
 {
-    if (s->dataplane_started && d && blk_is_available(d->conf.blk)) {
-        assert(blk_get_aio_context(d->conf.blk) == s->ctx);
-    }
+    VirtIOSCSICancelNotifier *notifier;
+
+    assert(r->ctx == qemu_get_current_aio_context());
+
+    /* Decremented in virtio_scsi_cancel_notify() */
+    qatomic_inc(&tmf->remaining);
+
+    notifier = g_new(VirtIOSCSICancelNotifier, 1);
+    notifier->notifier.notify = virtio_scsi_cancel_notify;
+    notifier->tmf_req = tmf;
+    scsi_req_cancel_async(r, &notifier->notifier);
 }
 
-static void virtio_scsi_do_one_tmf_bh(VirtIOSCSIReq *req)
+/* Execute a TMF on the requests in the current AioContext */
+static void virtio_scsi_do_tmf_aio_context(void *opaque)
 {
-    VirtIOSCSI *s = req->dev;
-    SCSIDevice *d = virtio_scsi_device_get(s, req->req.tmf.lun);
-    BusChild *kid;
-    int target;
+    AioContext *ctx = qemu_get_current_aio_context();
+    VirtIOSCSIReq *tmf = opaque;
+    VirtIOSCSI *s = tmf->dev;
+    SCSIDevice *d = virtio_scsi_device_get(s, tmf->req.tmf.lun);
+    SCSIRequest *r;
+    bool match_tag;
 
-    switch (req->req.tmf.subtype) {
-    case VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET:
-        if (!d) {
-            req->resp.tmf.response = VIRTIO_SCSI_S_BAD_TARGET;
-            goto out;
-        }
-        if (d->lun != virtio_scsi_get_lun(req->req.tmf.lun)) {
-            req->resp.tmf.response = VIRTIO_SCSI_S_INCORRECT_LUN;
-            goto out;
-        }
-        qatomic_inc(&s->resetting);
-        device_cold_reset(&d->qdev);
-        qatomic_dec(&s->resetting);
+    if (!d) {
+        tmf->resp.tmf.response = VIRTIO_SCSI_S_BAD_TARGET;
+        virtio_scsi_tmf_dec_remaining(tmf);
+        return;
+    }
+
+    /*
+     * This function could handle other subtypes that need to be processed in
+     * the request's AioContext in the future, but for now only request
+     * cancelation subtypes are performed here.
+     */
+    switch (tmf->req.tmf.subtype) {
+    case VIRTIO_SCSI_T_TMF_ABORT_TASK:
+        match_tag = true;
         break;
+    case VIRTIO_SCSI_T_TMF_ABORT_TASK_SET:
+    case VIRTIO_SCSI_T_TMF_CLEAR_TASK_SET:
+        match_tag = false;
+        break;
+    default:
+        g_assert_not_reached();
+    }
 
-    case VIRTIO_SCSI_T_TMF_I_T_NEXUS_RESET:
-        target = req->req.tmf.lun[1];
-        qatomic_inc(&s->resetting);
+    WITH_QEMU_LOCK_GUARD(&d->requests_lock) {
+        QTAILQ_FOREACH(r, &d->requests, next) {
+            VirtIOSCSIReq *cmd_req = r->hba_private;
+            assert(cmd_req); /* request has hba_private while enqueued */
 
-        rcu_read_lock();
-        QTAILQ_FOREACH_RCU(kid, &s->bus.qbus.children, sibling) {
-            SCSIDevice *d1 = SCSI_DEVICE(kid->child);
-            if (d1->channel == 0 && d1->id == target) {
-                device_cold_reset(&d1->qdev);
+            if (r->ctx != ctx) {
+                continue;
+            }
+            if (match_tag && cmd_req->req.cmd.tag != tmf->req.tmf.tag) {
+                continue;
             }
+            virtio_scsi_tmf_cancel_req(tmf, r);
         }
-        rcu_read_unlock();
-
-        qatomic_dec(&s->resetting);
-        break;
-
-    default:
-        g_assert_not_reached();
     }
 
-out:
-    object_unref(OBJECT(d));
-    virtio_scsi_complete_req_from_main_loop(req);
+    /* Incremented by virtio_scsi_do_tmf() */
+    virtio_scsi_tmf_dec_remaining(tmf);
+
+    object_unref(d);
 }
 
-/* Some TMFs must be processed from the main loop thread */
-static void virtio_scsi_do_tmf_bh(void *opaque)
+static void dummy_bh(void *opaque)
 {
-    VirtIOSCSI *s = opaque;
-    QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
-    VirtIOSCSIReq *req;
-    VirtIOSCSIReq *tmp;
+    /* Do nothing */
+}
 
+/*
+ * Wait for pending virtio_scsi_defer_tmf_to_aio_context() BHs.
+ */
+static void virtio_scsi_flush_defer_tmf_to_aio_context(VirtIOSCSI *s)
+{
     GLOBAL_STATE_CODE();
 
-    WITH_QEMU_LOCK_GUARD(&s->tmf_bh_lock) {
-        QTAILQ_FOREACH_SAFE(req, &s->tmf_bh_list, next, tmp) {
-            QTAILQ_REMOVE(&s->tmf_bh_list, req, next);
-            QTAILQ_INSERT_TAIL(&reqs, req, next);
-        }
+    assert(!s->dataplane_started);
 
-        qemu_bh_delete(s->tmf_bh);
-        s->tmf_bh = NULL;
-    }
+    for (uint32_t i = 0; i < s->parent_obj.conf.num_queues; i++) {
+        AioContext *ctx = s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED + i];
 
-    QTAILQ_FOREACH_SAFE(req, &reqs, next, tmp) {
-        QTAILQ_REMOVE(&reqs, req, next);
-        virtio_scsi_do_one_tmf_bh(req);
+        /* Our BH only runs after previously scheduled BHs */
+        aio_wait_bh_oneshot(ctx, dummy_bh, NULL);
     }
 }
 
-static void virtio_scsi_reset_tmf_bh(VirtIOSCSI *s)
+/*
+ * Run the TMF in a specific AioContext, handling only requests in that
+ * AioContext. This is necessary because requests can run in different
+ * AioContext and it is only possible to cancel them from the AioContext where
+ * they are running.
+ */
+static void virtio_scsi_defer_tmf_to_aio_context(VirtIOSCSIReq *tmf,
+                                                 AioContext *ctx)
 {
-    VirtIOSCSIReq *req;
-    VirtIOSCSIReq *tmp;
+    /* Decremented in virtio_scsi_do_tmf_aio_context() */
+    qatomic_inc(&tmf->remaining);
 
-    GLOBAL_STATE_CODE();
-
-    /* Called after ioeventfd has been stopped, so tmf_bh_lock is not needed */
-    if (s->tmf_bh) {
-        qemu_bh_delete(s->tmf_bh);
-        s->tmf_bh = NULL;
-    }
-
-    QTAILQ_FOREACH_SAFE(req, &s->tmf_bh_list, next, tmp) {
-        QTAILQ_REMOVE(&s->tmf_bh_list, req, next);
-
-        /* SAM-6 6.3.2 Hard reset */
-        req->resp.tmf.response = VIRTIO_SCSI_S_TARGET_FAILURE;
-        virtio_scsi_complete_req(req);
-    }
+    /* See virtio_scsi_flush_defer_tmf_to_aio_context() cleanup during reset */
+    aio_bh_schedule_oneshot(ctx, virtio_scsi_do_tmf_aio_context, tmf);
 }
 
-static void virtio_scsi_defer_tmf_to_bh(VirtIOSCSIReq *req)
+/*
+ * Returns the AioContext for a given TMF's tag field or NULL. Note that the
+ * request identified by the tag may have completed by the time you can execute
+ * a BH in the AioContext, so don't assume the request still exists in your BH.
+ */
+static AioContext *find_aio_context_for_tmf_tag(SCSIDevice *d,
+                                                VirtIOSCSIReq *tmf)
 {
-    VirtIOSCSI *s = req->dev;
+    WITH_QEMU_LOCK_GUARD(&d->requests_lock) {
+        SCSIRequest *r;
+        SCSIRequest *next;
+
+        QTAILQ_FOREACH_SAFE(r, &d->requests, next, next) {
+            VirtIOSCSIReq *cmd_req = r->hba_private;
 
-    WITH_QEMU_LOCK_GUARD(&s->tmf_bh_lock) {
-        QTAILQ_INSERT_TAIL(&s->tmf_bh_list, req, next);
+            /* hba_private is non-NULL while the request is enqueued */
+            assert(cmd_req);
 
-        if (!s->tmf_bh) {
-            s->tmf_bh = qemu_bh_new(virtio_scsi_do_tmf_bh, s);
-            qemu_bh_schedule(s->tmf_bh);
+            if (cmd_req->req.cmd.tag == tmf->req.tmf.tag) {
+                return r->ctx;
+            }
         }
     }
+    return NULL;
 }
 
 /* Return 0 if the request is ready to be completed and return to guest;
@@ -433,9 +459,9 @@ static int virtio_scsi_do_tmf(VirtIOSCSI *s, VirtIOSCSIReq *req)
 {
     SCSIDevice *d = virtio_scsi_device_get(s, req->req.tmf.lun);
     SCSIRequest *r, *next;
+    AioContext *ctx;
     int ret = 0;
 
-    virtio_scsi_ctx_check(s, d);
     /* Here VIRTIO_SCSI_S_OK means "FUNCTION COMPLETE".  */
     req->resp.tmf.response = VIRTIO_SCSI_S_OK;
 
@@ -450,7 +476,22 @@ static int virtio_scsi_do_tmf(VirtIOSCSI *s, VirtIOSCSIReq *req)
                               req->req.tmf.tag, req->req.tmf.subtype);
 
     switch (req->req.tmf.subtype) {
-    case VIRTIO_SCSI_T_TMF_ABORT_TASK:
+    case VIRTIO_SCSI_T_TMF_ABORT_TASK: {
+        if (!d) {
+            goto fail;
+        }
+        if (d->lun != virtio_scsi_get_lun(req->req.tmf.lun)) {
+            goto incorrect_lun;
+        }
+
+        ctx = find_aio_context_for_tmf_tag(d, req);
+        if (ctx) {
+            virtio_scsi_defer_tmf_to_aio_context(req, ctx);
+            ret = -EINPROGRESS;
+        }
+        break;
+    }
+
     case VIRTIO_SCSI_T_TMF_QUERY_TASK:
         if (!d) {
             goto fail;
@@ -458,44 +499,82 @@ static int virtio_scsi_do_tmf(VirtIOSCSI *s, VirtIOSCSIReq *req)
         if (d->lun != virtio_scsi_get_lun(req->req.tmf.lun)) {
             goto incorrect_lun;
         }
-        QTAILQ_FOREACH_SAFE(r, &d->requests, next, next) {
-            VirtIOSCSIReq *cmd_req = r->hba_private;
-            if (cmd_req && cmd_req->req.cmd.tag == req->req.tmf.tag) {
-                break;
+
+        WITH_QEMU_LOCK_GUARD(&d->requests_lock) {
+            QTAILQ_FOREACH(r, &d->requests, next) {
+                VirtIOSCSIReq *cmd_req = r->hba_private;
+                assert(cmd_req); /* request has hba_private while enqueued */
+
+                if (cmd_req->req.cmd.tag == req->req.tmf.tag) {
+                    /*
+                     * "If the specified command is present in the task set,
+                     * then return a service response set to FUNCTION
+                     * SUCCEEDED".
+                     */
+                    req->resp.tmf.response = VIRTIO_SCSI_S_FUNCTION_SUCCEEDED;
+                }
             }
         }
-        if (r) {
-            /*
-             * Assert that the request has not been completed yet, we
-             * check for it in the loop above.
-             */
-            assert(r->hba_private);
-            if (req->req.tmf.subtype == VIRTIO_SCSI_T_TMF_QUERY_TASK) {
-                /* "If the specified command is present in the task set, then
-                 * return a service response set to FUNCTION SUCCEEDED".
-                 */
-                req->resp.tmf.response = VIRTIO_SCSI_S_FUNCTION_SUCCEEDED;
-            } else {
-                VirtIOSCSICancelNotifier *notifier;
-
-                req->remaining = 1;
-                notifier = g_new(VirtIOSCSICancelNotifier, 1);
-                notifier->tmf_req = req;
-                notifier->notifier.notify = virtio_scsi_cancel_notify;
-                scsi_req_cancel_async(r, &notifier->notifier);
-                ret = -EINPROGRESS;
+        break;
+
+    case VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET:
+        if (!d) {
+            goto fail;
+        }
+        if (d->lun != virtio_scsi_get_lun(req->req.tmf.lun)) {
+            goto incorrect_lun;
+        }
+        qatomic_inc(&s->resetting);
+        device_cold_reset(&d->qdev);
+        qatomic_dec(&s->resetting);
+        break;
+
+    case VIRTIO_SCSI_T_TMF_I_T_NEXUS_RESET: {
+        BusChild *kid;
+        int target = req->req.tmf.lun[1];
+        qatomic_inc(&s->resetting);
+
+        rcu_read_lock();
+        QTAILQ_FOREACH_RCU(kid, &s->bus.qbus.children, sibling) {
+            SCSIDevice *d1 = SCSI_DEVICE(kid->child);
+            if (d1->channel == 0 && d1->id == target) {
+                device_cold_reset(&d1->qdev);
             }
         }
+        rcu_read_unlock();
+
+        qatomic_dec(&s->resetting);
         break;
+    }
 
-    case VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET:
-    case VIRTIO_SCSI_T_TMF_I_T_NEXUS_RESET:
-        virtio_scsi_defer_tmf_to_bh(req);
+    case VIRTIO_SCSI_T_TMF_ABORT_TASK_SET:
+    case VIRTIO_SCSI_T_TMF_CLEAR_TASK_SET: {
+        g_autoptr(GHashTable) aio_contexts = g_hash_table_new(NULL, NULL);
+
+        if (!d) {
+            goto fail;
+        }
+        if (d->lun != virtio_scsi_get_lun(req->req.tmf.lun)) {
+            goto incorrect_lun;
+        }
+
+        qatomic_inc(&req->remaining);
+
+        for (uint32_t i = 0; i < s->parent_obj.conf.num_queues; i++) {
+            ctx = s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED + i];
+
+            if (!g_hash_table_add(aio_contexts, ctx)) {
+                continue; /* skip previously added AioContext */
+            }
+
+            virtio_scsi_defer_tmf_to_aio_context(req, ctx);
+        }
+
+        virtio_scsi_tmf_dec_remaining(req);
         ret = -EINPROGRESS;
         break;
+    }
 
-    case VIRTIO_SCSI_T_TMF_ABORT_TASK_SET:
-    case VIRTIO_SCSI_T_TMF_CLEAR_TASK_SET:
     case VIRTIO_SCSI_T_TMF_QUERY_TASK_SET:
         if (!d) {
             goto fail;
@@ -504,34 +583,19 @@ static int virtio_scsi_do_tmf(VirtIOSCSI *s, VirtIOSCSIReq *req)
             goto incorrect_lun;
         }
 
-        /* Add 1 to "remaining" until virtio_scsi_do_tmf returns.
-         * This way, if the bus starts calling back to the notifiers
-         * even before we finish the loop, virtio_scsi_cancel_notify
-         * will not complete the TMF too early.
-         */
-        req->remaining = 1;
-        QTAILQ_FOREACH_SAFE(r, &d->requests, next, next) {
-            if (r->hba_private) {
-                if (req->req.tmf.subtype == VIRTIO_SCSI_T_TMF_QUERY_TASK_SET) {
-                    /* "If there is any command present in the task set, then
-                     * return a service response set to FUNCTION SUCCEEDED".
-                     */
-                    req->resp.tmf.response = VIRTIO_SCSI_S_FUNCTION_SUCCEEDED;
-                    break;
-                } else {
-                    VirtIOSCSICancelNotifier *notifier;
-
-                    req->remaining++;
-                    notifier = g_new(VirtIOSCSICancelNotifier, 1);
-                    notifier->notifier.notify = virtio_scsi_cancel_notify;
-                    notifier->tmf_req = req;
-                    scsi_req_cancel_async(r, &notifier->notifier);
-                }
+        WITH_QEMU_LOCK_GUARD(&d->requests_lock) {
+            QTAILQ_FOREACH_SAFE(r, &d->requests, next, next) {
+                /* Request has hba_private while enqueued */
+                assert(r->hba_private);
+
+                /*
+                 * "If there is any command present in the task set, then
+                 * return a service response set to FUNCTION SUCCEEDED".
+                 */
+                req->resp.tmf.response = VIRTIO_SCSI_S_FUNCTION_SUCCEEDED;
+                break;
             }
         }
-        if (--req->remaining > 0) {
-            ret = -EINPROGRESS;
-        }
         break;
 
     case VIRTIO_SCSI_T_TMF_CLEAR_ACA:
@@ -562,7 +626,7 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
 
     if (iov_to_buf(req->elem.out_sg, req->elem.out_num, 0,
                 &type, sizeof(type)) < sizeof(type)) {
-        virtio_scsi_bad_req(req);
+        virtio_scsi_bad_req(req, &s->ctrl_lock);
         return;
     }
 
@@ -570,7 +634,7 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
     if (type == VIRTIO_SCSI_T_TMF) {
         if (virtio_scsi_parse_req(req, sizeof(VirtIOSCSICtrlTMFReq),
                     sizeof(VirtIOSCSICtrlTMFResp)) < 0) {
-            virtio_scsi_bad_req(req);
+            virtio_scsi_bad_req(req, &s->ctrl_lock);
             return;
         } else {
             r = virtio_scsi_do_tmf(s, req);
@@ -580,7 +644,7 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
                type == VIRTIO_SCSI_T_AN_SUBSCRIBE) {
         if (virtio_scsi_parse_req(req, sizeof(VirtIOSCSICtrlANReq),
                     sizeof(VirtIOSCSICtrlANResp)) < 0) {
-            virtio_scsi_bad_req(req);
+            virtio_scsi_bad_req(req, &s->ctrl_lock);
             return;
         } else {
             req->req.an.event_requested =
@@ -600,7 +664,7 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
                  type == VIRTIO_SCSI_T_AN_SUBSCRIBE)
             trace_virtio_scsi_an_resp(virtio_scsi_get_lun(req->req.an.lun),
                                       req->resp.an.response);
-        virtio_scsi_complete_req(req);
+        virtio_scsi_complete_req(req, &s->ctrl_lock);
     } else {
         assert(r == -EINPROGRESS);
     }
@@ -610,7 +674,7 @@ static void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
     VirtIOSCSIReq *req;
 
-    while ((req = virtio_scsi_pop_req(s, vq))) {
+    while ((req = virtio_scsi_pop_req(s, vq, &s->ctrl_lock))) {
         virtio_scsi_handle_ctrl_req(s, req);
     }
 }
@@ -625,9 +689,12 @@ static void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
  */
 static bool virtio_scsi_defer_to_dataplane(VirtIOSCSI *s)
 {
-    if (!s->ctx || s->dataplane_started) {
+    if (s->dataplane_started) {
         return false;
     }
+    if (s->vq_aio_context[0] == qemu_get_aio_context()) {
+        return false; /* not using IOThreads */
+    }
 
     virtio_device_start_ioeventfd(&s->parent_obj.parent_obj);
     return !s->dataplane_fenced;
@@ -654,7 +721,7 @@ static void virtio_scsi_complete_cmd_req(VirtIOSCSIReq *req)
      * in virtio_scsi_command_complete.
      */
     req->resp_size = sizeof(VirtIOSCSICmdResp);
-    virtio_scsi_complete_req(req);
+    virtio_scsi_complete_req(req, NULL);
 }
 
 static void virtio_scsi_command_failed(SCSIRequest *r)
@@ -788,7 +855,7 @@ static int virtio_scsi_handle_cmd_req_prepare(VirtIOSCSI *s, VirtIOSCSIReq *req)
             virtio_scsi_fail_cmd_req(req);
             return -ENOTSUP;
         } else {
-            virtio_scsi_bad_req(req);
+            virtio_scsi_bad_req(req, NULL);
             return -EINVAL;
         }
     }
@@ -801,7 +868,6 @@ static int virtio_scsi_handle_cmd_req_prepare(VirtIOSCSI *s, VirtIOSCSIReq *req)
         virtio_scsi_complete_cmd_req(req);
         return -ENOENT;
     }
-    virtio_scsi_ctx_check(s, d);
     req->sreq = scsi_req_new(d, req->req.cmd.tag,
                              virtio_scsi_get_lun(req->req.cmd.lun),
                              req->req.cmd.cdb, vs->cdb_size, req);
@@ -843,7 +909,7 @@ static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
             virtio_queue_set_notification(vq, 0);
         }
 
-        while ((req = virtio_scsi_pop_req(s, vq))) {
+        while ((req = virtio_scsi_pop_req(s, vq, NULL))) {
             ret = virtio_scsi_handle_cmd_req_prepare(s, req);
             if (!ret) {
                 QTAILQ_INSERT_TAIL(&reqs, req, next);
@@ -936,7 +1002,7 @@ static void virtio_scsi_reset(VirtIODevice *vdev)
 
     assert(!s->dataplane_started);
 
-    virtio_scsi_reset_tmf_bh(s);
+    virtio_scsi_flush_defer_tmf_to_aio_context(s);
 
     qatomic_inc(&s->resetting);
     bus_cold_reset(BUS(&s->bus));
@@ -944,7 +1010,10 @@ static void virtio_scsi_reset(VirtIODevice *vdev)
 
     vs->sense_size = VIRTIO_SCSI_SENSE_DEFAULT_SIZE;
     vs->cdb_size = VIRTIO_SCSI_CDB_DEFAULT_SIZE;
-    s->events_dropped = false;
+
+    WITH_QEMU_LOCK_GUARD(&s->event_lock) {
+        s->events_dropped = false;
+    }
 }
 
 typedef struct {
@@ -973,19 +1042,21 @@ static void virtio_scsi_push_event(VirtIOSCSI *s,
         return;
     }
 
-    req = virtio_scsi_pop_req(s, vs->event_vq);
-    if (!req) {
-        s->events_dropped = true;
-        return;
-    }
+    req = virtio_scsi_pop_req(s, vs->event_vq, &s->event_lock);
+    WITH_QEMU_LOCK_GUARD(&s->event_lock) {
+        if (!req) {
+            s->events_dropped = true;
+            return;
+        }
 
-    if (s->events_dropped) {
-        event |= VIRTIO_SCSI_T_EVENTS_MISSED;
-        s->events_dropped = false;
+        if (s->events_dropped) {
+            event |= VIRTIO_SCSI_T_EVENTS_MISSED;
+            s->events_dropped = false;
+        }
     }
 
     if (virtio_scsi_parse_req(req, 0, sizeof(VirtIOSCSIEvent))) {
-        virtio_scsi_bad_req(req);
+        virtio_scsi_bad_req(req, &s->event_lock);
         return;
     }
 
@@ -1005,12 +1076,18 @@ static void virtio_scsi_push_event(VirtIOSCSI *s,
     }
     trace_virtio_scsi_event(virtio_scsi_get_lun(evt->lun), event, reason);
 
-    virtio_scsi_complete_req(req);
+    virtio_scsi_complete_req(req, &s->event_lock);
 }
 
 static void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
-    if (s->events_dropped) {
+    bool events_dropped;
+
+    WITH_QEMU_LOCK_GUARD(&s->event_lock) {
+        events_dropped = s->events_dropped;
+    }
+
+    if (events_dropped) {
         VirtIOSCSIEventInfo info = {
             .event = VIRTIO_SCSI_T_NO_EVENT,
         };
@@ -1061,14 +1138,16 @@ static void virtio_scsi_hotplug(HotplugHandler *hotplug_dev, DeviceState *dev,
 {
     VirtIODevice *vdev = VIRTIO_DEVICE(hotplug_dev);
     VirtIOSCSI *s = VIRTIO_SCSI(vdev);
+    AioContext *ctx = s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED];
     SCSIDevice *sd = SCSI_DEVICE(dev);
-    int ret;
 
-    if (s->ctx && !s->dataplane_fenced) {
-        ret = blk_set_aio_context(sd->conf.blk, s->ctx, errp);
-        if (ret < 0) {
-            return;
-        }
+    if (ctx != qemu_get_aio_context() && !s->dataplane_fenced) {
+        /*
+         * Try to make the BlockBackend's AioContext match ours. Ignore failure
+         * because I/O will still work although block jobs and other users
+         * might be slower when multiple AioContexts use a BlockBackend.
+         */
+        blk_set_aio_context(sd->conf.blk, ctx, NULL);
     }
 
     if (virtio_vdev_has_feature(vdev, VIRTIO_SCSI_F_HOTPLUG)) {
@@ -1103,7 +1182,7 @@ static void virtio_scsi_hotunplug(HotplugHandler *hotplug_dev, DeviceState *dev,
 
     qdev_simple_device_unplug_cb(hotplug_dev, dev, errp);
 
-    if (s->ctx) {
+    if (s->vq_aio_context[VIRTIO_SCSI_VQ_NUM_FIXED] != qemu_get_aio_context()) {
         /* If other users keep the BlockBackend in the iothread, that's ok */
         blk_set_aio_context(sd->conf.blk, qemu_get_aio_context(), NULL);
     }
@@ -1137,7 +1216,7 @@ static void virtio_scsi_drained_begin(SCSIBus *bus)
 
     for (uint32_t i = 0; i < total_queues; i++) {
         VirtQueue *vq = virtio_get_queue(vdev, i);
-        virtio_queue_aio_detach_host_notifier(vq, s->ctx);
+        virtio_queue_aio_detach_host_notifier(vq, s->vq_aio_context[i]);
     }
 }
 
@@ -1163,10 +1242,12 @@ static void virtio_scsi_drained_end(SCSIBus *bus)
 
     for (uint32_t i = 0; i < total_queues; i++) {
         VirtQueue *vq = virtio_get_queue(vdev, i);
+        AioContext *ctx = s->vq_aio_context[i];
+
         if (vq == vs->event_vq) {
-            virtio_queue_aio_attach_host_notifier_no_poll(vq, s->ctx);
+            virtio_queue_aio_attach_host_notifier_no_poll(vq, ctx);
         } else {
-            virtio_queue_aio_attach_host_notifier(vq, s->ctx);
+            virtio_queue_aio_attach_host_notifier(vq, ctx);
         }
     }
 }
@@ -1235,8 +1316,8 @@ static void virtio_scsi_device_realize(DeviceState *dev, Error **errp)
     VirtIOSCSI *s = VIRTIO_SCSI(dev);
     Error *err = NULL;
 
-    QTAILQ_INIT(&s->tmf_bh_list);
-    qemu_mutex_init(&s->tmf_bh_lock);
+    qemu_mutex_init(&s->ctrl_lock);
+    qemu_mutex_init(&s->event_lock);
 
     virtio_scsi_common_realize(dev,
                                virtio_scsi_handle_ctrl,
@@ -1271,15 +1352,16 @@ void virtio_scsi_common_unrealize(DeviceState *dev)
     virtio_cleanup(vdev);
 }
 
+/* main loop */
 static void virtio_scsi_device_unrealize(DeviceState *dev)
 {
     VirtIOSCSI *s = VIRTIO_SCSI(dev);
 
-    virtio_scsi_reset_tmf_bh(s);
-
+    virtio_scsi_dataplane_cleanup(s);
     qbus_set_hotplug_handler(BUS(&s->bus), NULL);
     virtio_scsi_common_unrealize(dev);
-    qemu_mutex_destroy(&s->tmf_bh_lock);
+    qemu_mutex_destroy(&s->event_lock);
+    qemu_mutex_destroy(&s->ctrl_lock);
 }
 
 static const Property virtio_scsi_properties[] = {
@@ -1299,6 +1381,8 @@ static const Property virtio_scsi_properties[] = {
                                                 VIRTIO_SCSI_F_CHANGE, true),
     DEFINE_PROP_LINK("iothread", VirtIOSCSI, parent_obj.conf.iothread,
                      TYPE_IOTHREAD, IOThread *),
+    DEFINE_PROP_IOTHREAD_VQ_MAPPING_LIST("iothread-vq-mapping", VirtIOSCSI,
+            parent_obj.conf.iothread_vq_mapping_list),
 };
 
 static const VMStateDescription vmstate_virtio_scsi = {

+ 131 - 0
hw/virtio/iothread-vq-mapping.c

@@ -0,0 +1,131 @@
+/*
+ * IOThread Virtqueue Mapping
+ *
+ * Copyright Red Hat, Inc
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ */
+
+#include "qemu/osdep.h"
+#include "system/iothread.h"
+#include "hw/virtio/iothread-vq-mapping.h"
+
+static bool
+iothread_vq_mapping_validate(IOThreadVirtQueueMappingList *list, uint16_t
+        num_queues, Error **errp)
+{
+    g_autofree unsigned long *vqs = bitmap_new(num_queues);
+    g_autoptr(GHashTable) iothreads =
+        g_hash_table_new(g_str_hash, g_str_equal);
+
+    for (IOThreadVirtQueueMappingList *node = list; node; node = node->next) {
+        const char *name = node->value->iothread;
+        uint16List *vq;
+
+        if (!iothread_by_id(name)) {
+            error_setg(errp, "IOThread \"%s\" object does not exist", name);
+            return false;
+        }
+
+        if (!g_hash_table_add(iothreads, (gpointer)name)) {
+            error_setg(errp,
+                    "duplicate IOThread name \"%s\" in iothread-vq-mapping",
+                    name);
+            return false;
+        }
+
+        if (node != list) {
+            if (!!node->value->vqs != !!list->value->vqs) {
+                error_setg(errp, "either all items in iothread-vq-mapping "
+                                 "must have vqs or none of them must have it");
+                return false;
+            }
+        }
+
+        for (vq = node->value->vqs; vq; vq = vq->next) {
+            if (vq->value >= num_queues) {
+                error_setg(errp, "vq index %u for IOThread \"%s\" must be "
+                        "less than num_queues %u in iothread-vq-mapping",
+                        vq->value, name, num_queues);
+                return false;
+            }
+
+            if (test_and_set_bit(vq->value, vqs)) {
+                error_setg(errp, "cannot assign vq %u to IOThread \"%s\" "
+                        "because it is already assigned", vq->value, name);
+                return false;
+            }
+        }
+    }
+
+    if (list->value->vqs) {
+        for (uint16_t i = 0; i < num_queues; i++) {
+            if (!test_bit(i, vqs)) {
+                error_setg(errp,
+                        "missing vq %u IOThread assignment in iothread-vq-mapping",
+                        i);
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
+bool iothread_vq_mapping_apply(
+        IOThreadVirtQueueMappingList *list,
+        AioContext **vq_aio_context,
+        uint16_t num_queues,
+        Error **errp)
+{
+    IOThreadVirtQueueMappingList *node;
+    size_t num_iothreads = 0;
+    size_t cur_iothread = 0;
+
+    if (!iothread_vq_mapping_validate(list, num_queues, errp)) {
+        return false;
+    }
+
+    for (node = list; node; node = node->next) {
+        num_iothreads++;
+    }
+
+    for (node = list; node; node = node->next) {
+        IOThread *iothread = iothread_by_id(node->value->iothread);
+        AioContext *ctx = iothread_get_aio_context(iothread);
+
+        /* Released in virtio_blk_vq_aio_context_cleanup() */
+        object_ref(OBJECT(iothread));
+
+        if (node->value->vqs) {
+            uint16List *vq;
+
+            /* Explicit vq:IOThread assignment */
+            for (vq = node->value->vqs; vq; vq = vq->next) {
+                assert(vq->value < num_queues);
+                vq_aio_context[vq->value] = ctx;
+            }
+        } else {
+            /* Round-robin vq:IOThread assignment */
+            for (unsigned i = cur_iothread; i < num_queues;
+                 i += num_iothreads) {
+                vq_aio_context[i] = ctx;
+            }
+        }
+
+        cur_iothread++;
+    }
+
+    return true;
+}
+
+void iothread_vq_mapping_cleanup(IOThreadVirtQueueMappingList *list)
+{
+    IOThreadVirtQueueMappingList *node;
+
+    for (node = list; node; node = node->next) {
+        IOThread *iothread = iothread_by_id(node->value->iothread);
+        object_unref(OBJECT(iothread));
+    }
+}
+

+ 1 - 0
hw/virtio/meson.build

@@ -1,5 +1,6 @@
 system_virtio_ss = ss.source_set()
 system_virtio_ss.add(files('virtio-bus.c'))
+system_virtio_ss.add(files('iothread-vq-mapping.c'))
 system_virtio_ss.add(when: 'CONFIG_VIRTIO_PCI', if_true: files('virtio-pci.c'))
 system_virtio_ss.add(when: 'CONFIG_VIRTIO_MMIO', if_true: files('virtio-mmio.c'))
 system_virtio_ss.add(when: 'CONFIG_VIRTIO_CRYPTO', if_true: files('virtio-crypto.c'))

+ 4 - 1
include/block/aio.h

@@ -123,6 +123,10 @@ struct BHListSlice {
 
 typedef QSLIST_HEAD(, AioHandler) AioHandlerSList;
 
+typedef struct AioPolledEvent {
+    int64_t ns;        /* current polling time in nanoseconds */
+} AioPolledEvent;
+
 struct AioContext {
     GSource source;
 
@@ -229,7 +233,6 @@ struct AioContext {
     int poll_disable_cnt;
 
     /* Polling mode parameters */
-    int64_t poll_ns;        /* current polling time in nanoseconds */
     int64_t poll_max_ns;    /* maximum polling time in nanoseconds */
     int64_t poll_grow;      /* polling time growth factor */
     int64_t poll_shrink;    /* polling time shrink factor */

+ 17 - 2
include/block/raw-aio.h

@@ -17,6 +17,7 @@
 #define QEMU_RAW_AIO_H
 
 #include "block/aio.h"
+#include "block/block-common.h"
 #include "qemu/iov.h"
 
 /* AIO request types */
@@ -58,11 +59,18 @@ void laio_cleanup(LinuxAioState *s);
 
 /* laio_co_submit: submit I/O requests in the thread's current AioContext. */
 int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov,
-                                int type, uint64_t dev_max_batch);
+                                int type, BdrvRequestFlags flags,
+                                uint64_t dev_max_batch);
 
 bool laio_has_fdsync(int);
+bool laio_has_fua(void);
 void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context);
 void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
+#else
+static inline bool laio_has_fua(void)
+{
+    return false;
+}
 #endif
 /* io_uring.c - Linux io_uring implementation */
 #ifdef CONFIG_LINUX_IO_URING
@@ -71,9 +79,16 @@ void luring_cleanup(LuringState *s);
 
 /* luring_co_submit: submit I/O requests in the thread's current AioContext. */
 int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
-                                  QEMUIOVector *qiov, int type);
+                                  QEMUIOVector *qiov, int type,
+                                  BdrvRequestFlags flags);
 void luring_detach_aio_context(LuringState *s, AioContext *old_context);
 void luring_attach_aio_context(LuringState *s, AioContext *new_context);
+bool luring_has_fua(void);
+#else
+static inline bool luring_has_fua(void)
+{
+    return false;
+}
 #endif
 
 #ifdef _WIN32

+ 4 - 4
include/hw/scsi/scsi.h

@@ -24,6 +24,7 @@ struct SCSIRequest {
     SCSIBus           *bus;
     SCSIDevice        *dev;
     const SCSIReqOps  *ops;
+    AioContext        *ctx;
     uint32_t          refcount;
     uint32_t          tag;
     uint32_t          lun;
@@ -48,6 +49,8 @@ struct SCSIRequest {
     bool              dma_started;
     BlockAIOCB        *aiocb;
     QEMUSGList        *sg;
+
+    /* Protected by SCSIDevice->requests_lock */
     QTAILQ_ENTRY(SCSIRequest) next;
 };
 
@@ -76,10 +79,7 @@ struct SCSIDevice
     uint8_t sense[SCSI_SENSE_BUF_SIZE];
     uint32_t sense_len;
 
-    /*
-     * The requests list is only accessed from the AioContext that executes
-     * requests or from the main loop when IOThread processing is stopped.
-     */
+    QemuMutex requests_lock; /* protects the requests list */
     QTAILQ_HEAD(, SCSIRequest) requests;
 
     uint32_t channel;

+ 45 - 0
include/hw/virtio/iothread-vq-mapping.h

@@ -0,0 +1,45 @@
+/*
+ * IOThread Virtqueue Mapping
+ *
+ * Copyright Red Hat, Inc
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ */
+
+#ifndef HW_VIRTIO_IOTHREAD_VQ_MAPPING_H
+#define HW_VIRTIO_IOTHREAD_VQ_MAPPING_H
+
+#include "qapi/error.h"
+#include "qapi/qapi-types-virtio.h"
+
+/**
+ * iothread_vq_mapping_apply:
+ * @list: The mapping of virtqueues to IOThreads.
+ * @vq_aio_context: The array of AioContext pointers to fill in.
+ * @num_queues: The length of @vq_aio_context.
+ * @errp: If an error occurs, a pointer to the area to store the error.
+ *
+ * Fill in the AioContext for each virtqueue in the @vq_aio_context array given
+ * the iothread-vq-mapping parameter in @list.
+ *
+ * iothread_vq_mapping_cleanup() must be called to free IOThread object
+ * references after this function returns success.
+ *
+ * Returns: %true on success, %false on failure.
+ **/
+bool iothread_vq_mapping_apply(
+        IOThreadVirtQueueMappingList *list,
+        AioContext **vq_aio_context,
+        uint16_t num_queues,
+        Error **errp);
+
+/**
+ * iothread_vq_mapping_cleanup:
+ * @list: The mapping of virtqueues to IOThreads.
+ *
+ * Release IOThread object references that were acquired by
+ * iothread_vq_mapping_apply().
+ */
+void iothread_vq_mapping_cleanup(IOThreadVirtQueueMappingList *list);
+
+#endif /* HW_VIRTIO_IOTHREAD_VQ_MAPPING_H */

+ 7 - 8
include/hw/virtio/virtio-scsi.h

@@ -22,6 +22,7 @@
 #include "hw/virtio/virtio.h"
 #include "hw/scsi/scsi.h"
 #include "chardev/char-fe.h"
+#include "qapi/qapi-types-virtio.h"
 #include "system/iothread.h"
 
 #define TYPE_VIRTIO_SCSI_COMMON "virtio-scsi-common"
@@ -60,6 +61,7 @@ struct VirtIOSCSIConf {
     CharBackend chardev;
     uint32_t boot_tpgt;
     IOThread *iothread;
+    IOThreadVirtQueueMappingList *iothread_vq_mapping_list;
 };
 
 struct VirtIOSCSI;
@@ -82,18 +84,14 @@ struct VirtIOSCSI {
 
     SCSIBus bus;
     int resetting; /* written from main loop thread, read from any thread */
+
+    QemuMutex event_lock; /* protects event_vq and events_dropped */
     bool events_dropped;
 
-    /*
-     * TMFs deferred to main loop BH. These fields are protected by
-     * tmf_bh_lock.
-     */
-    QemuMutex tmf_bh_lock;
-    QEMUBH *tmf_bh;
-    QTAILQ_HEAD(, VirtIOSCSIReq) tmf_bh_list;
+    QemuMutex ctrl_lock; /* protects ctrl_vq */
 
     /* Fields for dataplane below */
-    AioContext *ctx; /* one iothread per virtio-scsi-pci for now */
+    AioContext **vq_aio_context; /* per-virtqueue AioContext pointer */
 
     bool dataplane_started;
     bool dataplane_starting;
@@ -111,6 +109,7 @@ void virtio_scsi_common_realize(DeviceState *dev,
 void virtio_scsi_common_unrealize(DeviceState *dev);
 
 void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp);
+void virtio_scsi_dataplane_cleanup(VirtIOSCSI *s);
 int virtio_scsi_dataplane_start(VirtIODevice *s);
 void virtio_scsi_dataplane_stop(VirtIODevice *s);
 

+ 0 - 1
include/system/block-backend-global-state.h

@@ -86,7 +86,6 @@ bool blk_supports_write_perm(BlockBackend *blk);
 bool blk_is_sg(BlockBackend *blk);
 void blk_set_enable_write_cache(BlockBackend *blk, bool wce);
 int blk_get_flags(BlockBackend *blk);
-bool blk_op_is_blocked(BlockBackend *blk, BlockOpType op, Error **errp);
 int blk_set_aio_context(BlockBackend *blk, AioContext *new_context,
                         Error **errp);
 void blk_add_aio_context_notifier(BlockBackend *blk,

+ 1 - 2
include/system/dma.h

@@ -290,8 +290,7 @@ typedef BlockAIOCB *DMAIOFunc(int64_t offset, QEMUIOVector *iov,
                               BlockCompletionFunc *cb, void *cb_opaque,
                               void *opaque);
 
-BlockAIOCB *dma_blk_io(AioContext *ctx,
-                       QEMUSGList *sg, uint64_t offset, uint32_t align,
+BlockAIOCB *dma_blk_io(QEMUSGList *sg, uint64_t offset, uint32_t align,
                        DMAIOFunc *io_func, void *io_func_opaque,
                        BlockCompletionFunc *cb, void *opaque, DMADirection dir);
 BlockAIOCB *dma_blk_read(BlockBackend *blk,

+ 8 - 0
meson.build

@@ -2727,6 +2727,14 @@ config_host_data.set('HAVE_OPTRESET',
                      cc.has_header_symbol('getopt.h', 'optreset'))
 config_host_data.set('HAVE_IPPROTO_MPTCP',
                      cc.has_header_symbol('netinet/in.h', 'IPPROTO_MPTCP'))
+if libaio.found()
+  config_host_data.set('HAVE_IO_PREP_PWRITEV2',
+                       cc.has_header_symbol('libaio.h', 'io_prep_pwritev2'))
+endif
+if linux_io_uring.found()
+  config_host_data.set('HAVE_IO_URING_PREP_WRITEV2',
+                       cc.has_header_symbol('liburing.h', 'io_uring_prep_writev2'))
+endif
 
 # has_member
 config_host_data.set('HAVE_SIGEV_NOTIFY_THREAD_ID',

+ 449 - 0
scripts/qcow2-to-stdout.py

@@ -0,0 +1,449 @@
+#!/usr/bin/env python3
+
+# This tool reads a disk image in any format and converts it to qcow2,
+# writing the result directly to stdout.
+#
+# Copyright (C) 2024 Igalia, S.L.
+#
+# Authors: Alberto Garcia <berto@igalia.com>
+#          Madeeha Javed <javed@igalia.com>
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# qcow2 files produced by this script are always arranged like this:
+#
+# - qcow2 header
+# - refcount table
+# - refcount blocks
+# - L1 table
+# - L2 tables
+# - Data clusters
+#
+# A note about variable names: in qcow2 there is one refcount table
+# and one (active) L1 table, although each can occupy several
+# clusters. For the sake of simplicity the code sometimes talks about
+# refcount tables and L1 tables when referring to those clusters.
+
+import argparse
+import errno
+import math
+import os
+import signal
+import struct
+import subprocess
+import sys
+import tempfile
+import time
+from contextlib import contextmanager
+
+QCOW2_DEFAULT_CLUSTER_SIZE = 65536
+QCOW2_DEFAULT_REFCOUNT_BITS = 16
+QCOW2_FEATURE_NAME_TABLE = 0x6803F857
+QCOW2_DATA_FILE_NAME_STRING = 0x44415441
+QCOW2_V3_HEADER_LENGTH = 112  # Header length in QEMU 9.0. Must be a multiple of 8
+QCOW2_INCOMPAT_DATA_FILE_BIT = 2
+QCOW2_AUTOCLEAR_DATA_FILE_RAW_BIT = 1
+QCOW_OFLAG_COPIED = 1 << 63
+QEMU_STORAGE_DAEMON = "qemu-storage-daemon"
+
+
+def bitmap_set(bitmap, idx):
+    bitmap[idx // 8] |= 1 << (idx % 8)
+
+
+def bitmap_is_set(bitmap, idx):
+    return (bitmap[idx // 8] & (1 << (idx % 8))) != 0
+
+
+def bitmap_iterator(bitmap, length):
+    for idx in range(length):
+        if bitmap_is_set(bitmap, idx):
+            yield idx
+
+
+def align_up(num, d):
+    return d * math.ceil(num / d)
+
+
+# Holes in the input file contain only zeroes so we can skip them and
+# save time. This function returns the indexes of the clusters that
+# are known to contain data. Those are the ones that we need to read.
+def clusters_with_data(fd, cluster_size):
+    data_to = 0
+    while True:
+        try:
+            data_from = os.lseek(fd, data_to, os.SEEK_DATA)
+            data_to = align_up(os.lseek(fd, data_from, os.SEEK_HOLE), cluster_size)
+            for idx in range(data_from // cluster_size, data_to // cluster_size):
+                yield idx
+        except OSError as err:
+            if err.errno == errno.ENXIO:  # End of file reached
+                break
+            raise err
+
+
+# write_qcow2_content() expects a raw input file. If we have a different
+# format we can use qemu-storage-daemon to make it appear as raw.
+@contextmanager
+def get_input_as_raw_file(input_file, input_format):
+    if input_format == "raw":
+        yield input_file
+        return
+    try:
+        temp_dir = tempfile.mkdtemp()
+        pid_file = os.path.join(temp_dir, "pid")
+        raw_file = os.path.join(temp_dir, "raw")
+        open(raw_file, "wb").close()
+        ret = subprocess.run(
+            [
+                QEMU_STORAGE_DAEMON,
+                "--daemonize",
+                "--pidfile", pid_file,
+                "--blockdev", f"driver=file,node-name=file0,driver=file,filename={input_file},read-only=on",
+                "--blockdev", f"driver={input_format},node-name=disk0,file=file0,read-only=on",
+                "--export", f"type=fuse,id=export0,node-name=disk0,mountpoint={raw_file},writable=off",
+            ],
+            capture_output=True,
+        )
+        if ret.returncode != 0:
+            sys.exit("[Error] Could not start the qemu-storage-daemon:\n" +
+                     ret.stderr.decode().rstrip('\n'))
+        yield raw_file
+    finally:
+        # Kill the storage daemon on exit
+        # and remove all temporary files
+        if os.path.exists(pid_file):
+            with open(pid_file, "r") as f:
+                pid = int(f.readline())
+            os.kill(pid, signal.SIGTERM)
+            while os.path.exists(pid_file):
+                time.sleep(0.1)
+        os.unlink(raw_file)
+        os.rmdir(temp_dir)
+
+
+def write_features(cluster, offset, data_file_name):
+    if data_file_name is not None:
+        encoded_name = data_file_name.encode("utf-8")
+        padded_name_len = align_up(len(encoded_name), 8)
+        struct.pack_into(f">II{padded_name_len}s", cluster, offset,
+                         QCOW2_DATA_FILE_NAME_STRING,
+                         len(encoded_name),
+                         encoded_name)
+        offset += 8 + padded_name_len
+
+    qcow2_features = [
+        # Incompatible
+        (0, 0, "dirty bit"),
+        (0, 1, "corrupt bit"),
+        (0, 2, "external data file"),
+        (0, 3, "compression type"),
+        (0, 4, "extended L2 entries"),
+        # Compatible
+        (1, 0, "lazy refcounts"),
+        # Autoclear
+        (2, 0, "bitmaps"),
+        (2, 1, "raw external data"),
+    ]
+    struct.pack_into(">I", cluster, offset, QCOW2_FEATURE_NAME_TABLE)
+    struct.pack_into(">I", cluster, offset + 4, len(qcow2_features) * 48)
+    offset += 8
+    for feature_type, feature_bit, feature_name in qcow2_features:
+        struct.pack_into(">BB46s", cluster, offset,
+                         feature_type, feature_bit, feature_name.encode("ascii"))
+        offset += 48
+
+
+def write_qcow2_content(input_file, cluster_size, refcount_bits, data_file_name, data_file_raw):
+    # Some basic values
+    l1_entries_per_table = cluster_size // 8
+    l2_entries_per_table = cluster_size // 8
+    refcounts_per_table  = cluster_size // 8
+    refcounts_per_block  = cluster_size * 8 // refcount_bits
+
+    # Virtual disk size, number of data clusters and L1 entries
+    disk_size = align_up(os.path.getsize(input_file), 512)
+    total_data_clusters = math.ceil(disk_size / cluster_size)
+    l1_entries = math.ceil(total_data_clusters / l2_entries_per_table)
+    allocated_l1_tables = math.ceil(l1_entries / l1_entries_per_table)
+
+    # Max L1 table size is 32 MB (QCOW_MAX_L1_SIZE in block/qcow2.h)
+    if (l1_entries * 8) > (32 * 1024 * 1024):
+        sys.exit("[Error] The image size is too large. Try using a larger cluster size.")
+
+    # Two bitmaps indicating which L1 and L2 entries are set
+    l1_bitmap = bytearray(allocated_l1_tables * l1_entries_per_table // 8)
+    l2_bitmap = bytearray(l1_entries * l2_entries_per_table // 8)
+    allocated_l2_tables = 0
+    allocated_data_clusters = 0
+
+    if data_file_raw:
+        # If data_file_raw is set then all clusters are allocated and
+        # we don't need to read the input file at all.
+        allocated_l2_tables = l1_entries
+        for idx in range(l1_entries):
+            bitmap_set(l1_bitmap, idx)
+        for idx in range(total_data_clusters):
+            bitmap_set(l2_bitmap, idx)
+    else:
+        # Open the input file for reading
+        fd = os.open(input_file, os.O_RDONLY)
+        zero_cluster = bytes(cluster_size)
+        # Read all the clusters that contain data
+        for idx in clusters_with_data(fd, cluster_size):
+            cluster = os.pread(fd, cluster_size, cluster_size * idx)
+            # If the last cluster is smaller than cluster_size pad it with zeroes
+            if len(cluster) < cluster_size:
+                cluster += bytes(cluster_size - len(cluster))
+            # If a cluster has non-zero data then it must be allocated
+            # in the output file and its L2 entry must be set
+            if cluster != zero_cluster:
+                bitmap_set(l2_bitmap, idx)
+                allocated_data_clusters += 1
+                # Allocated data clusters also need their corresponding L1 entry and L2 table
+                l1_idx = math.floor(idx / l2_entries_per_table)
+                if not bitmap_is_set(l1_bitmap, l1_idx):
+                    bitmap_set(l1_bitmap, l1_idx)
+                    allocated_l2_tables += 1
+
+    # Total amount of allocated clusters excluding the refcount blocks and table
+    total_allocated_clusters = 1 + allocated_l1_tables + allocated_l2_tables
+    if data_file_name is None:
+        total_allocated_clusters += allocated_data_clusters
+
+    # Clusters allocated for the refcount blocks and table
+    allocated_refcount_blocks = math.ceil(total_allocated_clusters  / refcounts_per_block)
+    allocated_refcount_tables = math.ceil(allocated_refcount_blocks / refcounts_per_table)
+
+    # Now we have a problem because allocated_refcount_blocks and allocated_refcount_tables...
+    # (a) increase total_allocated_clusters, and
+    # (b) need to be recalculated when total_allocated_clusters is increased
+    # So we need to repeat the calculation as long as the numbers change
+    while True:
+        new_total_allocated_clusters = total_allocated_clusters + allocated_refcount_tables + allocated_refcount_blocks
+        new_allocated_refcount_blocks = math.ceil(new_total_allocated_clusters / refcounts_per_block)
+        if new_allocated_refcount_blocks > allocated_refcount_blocks:
+            allocated_refcount_blocks = new_allocated_refcount_blocks
+            allocated_refcount_tables = math.ceil(allocated_refcount_blocks / refcounts_per_table)
+        else:
+            break
+
+    # Now that we have the final numbers we can update total_allocated_clusters
+    total_allocated_clusters += allocated_refcount_tables + allocated_refcount_blocks
+
+    # At this point we have the exact number of clusters that the output
+    # image is going to use so we can calculate all the offsets.
+    current_cluster_idx = 1
+
+    refcount_table_offset = current_cluster_idx * cluster_size
+    current_cluster_idx += allocated_refcount_tables
+
+    refcount_block_offset = current_cluster_idx * cluster_size
+    current_cluster_idx += allocated_refcount_blocks
+
+    l1_table_offset = current_cluster_idx * cluster_size
+    current_cluster_idx += allocated_l1_tables
+
+    l2_table_offset = current_cluster_idx * cluster_size
+    current_cluster_idx += allocated_l2_tables
+
+    data_clusters_offset = current_cluster_idx * cluster_size
+
+    # Calculate some values used in the qcow2 header
+    if allocated_l1_tables == 0:
+        l1_table_offset = 0
+
+    hdr_cluster_bits = int(math.log2(cluster_size))
+    hdr_refcount_bits = int(math.log2(refcount_bits))
+    hdr_length = QCOW2_V3_HEADER_LENGTH
+    hdr_incompat_features = 0
+    if data_file_name is not None:
+        hdr_incompat_features |= 1 << QCOW2_INCOMPAT_DATA_FILE_BIT
+    hdr_autoclear_features = 0
+    if data_file_raw:
+        hdr_autoclear_features |= 1 << QCOW2_AUTOCLEAR_DATA_FILE_RAW_BIT
+
+    ### Write qcow2 header
+    cluster = bytearray(cluster_size)
+    struct.pack_into(">4sIQIIQIIQQIIQQQQII", cluster, 0,
+        b"QFI\xfb",            # QCOW magic string
+        3,                     # version
+        0,                     # backing file offset
+        0,                     # backing file sizes
+        hdr_cluster_bits,
+        disk_size,
+        0,                     # encryption method
+        l1_entries,
+        l1_table_offset,
+        refcount_table_offset,
+        allocated_refcount_tables,
+        0,                     # number of snapshots
+        0,                     # snapshot table offset
+        hdr_incompat_features,
+        0,                     # compatible features
+        hdr_autoclear_features,
+        hdr_refcount_bits,
+        hdr_length,
+    )
+
+    write_features(cluster, hdr_length, data_file_name)
+
+    sys.stdout.buffer.write(cluster)
+
+    ### Write refcount table
+    cur_offset = refcount_block_offset
+    remaining_refcount_table_entries = allocated_refcount_blocks # Each entry is a pointer to a refcount block
+    while remaining_refcount_table_entries > 0:
+        cluster = bytearray(cluster_size)
+        to_write = min(remaining_refcount_table_entries, refcounts_per_table)
+        remaining_refcount_table_entries -= to_write
+        for idx in range(to_write):
+            struct.pack_into(">Q", cluster, idx * 8, cur_offset)
+            cur_offset += cluster_size
+        sys.stdout.buffer.write(cluster)
+
+    ### Write refcount blocks
+    remaining_refcount_block_entries = total_allocated_clusters # One entry for each allocated cluster
+    for tbl in range(allocated_refcount_blocks):
+        cluster = bytearray(cluster_size)
+        to_write = min(remaining_refcount_block_entries, refcounts_per_block)
+        remaining_refcount_block_entries -= to_write
+        # All refcount entries contain the number 1. The only difference
+        # is their bit width, defined when the image is created.
+        for idx in range(to_write):
+            if refcount_bits == 64:
+                struct.pack_into(">Q", cluster, idx * 8, 1)
+            elif refcount_bits == 32:
+                struct.pack_into(">L", cluster, idx * 4, 1)
+            elif refcount_bits == 16:
+                struct.pack_into(">H", cluster, idx * 2, 1)
+            elif refcount_bits == 8:
+                cluster[idx] = 1
+            elif refcount_bits == 4:
+                cluster[idx // 2] |= 1 << ((idx % 2) * 4)
+            elif refcount_bits == 2:
+                cluster[idx // 4] |= 1 << ((idx % 4) * 2)
+            elif refcount_bits == 1:
+                cluster[idx // 8] |= 1 << (idx % 8)
+        sys.stdout.buffer.write(cluster)
+
+    ### Write L1 table
+    cur_offset = l2_table_offset
+    for tbl in range(allocated_l1_tables):
+        cluster = bytearray(cluster_size)
+        for idx in range(l1_entries_per_table):
+            l1_idx = tbl * l1_entries_per_table + idx
+            if bitmap_is_set(l1_bitmap, l1_idx):
+                struct.pack_into(">Q", cluster, idx * 8, cur_offset | QCOW_OFLAG_COPIED)
+                cur_offset += cluster_size
+        sys.stdout.buffer.write(cluster)
+
+    ### Write L2 tables
+    cur_offset = data_clusters_offset
+    for tbl in range(l1_entries):
+        # Skip the empty L2 tables. We can identify them because
+        # there is no L1 entry pointing at them.
+        if bitmap_is_set(l1_bitmap, tbl):
+            cluster = bytearray(cluster_size)
+            for idx in range(l2_entries_per_table):
+                l2_idx = tbl * l2_entries_per_table + idx
+                if bitmap_is_set(l2_bitmap, l2_idx):
+                    if data_file_name is None:
+                        struct.pack_into(">Q", cluster, idx * 8, cur_offset | QCOW_OFLAG_COPIED)
+                        cur_offset += cluster_size
+                    else:
+                        struct.pack_into(">Q", cluster, idx * 8, (l2_idx * cluster_size) | QCOW_OFLAG_COPIED)
+            sys.stdout.buffer.write(cluster)
+
+    ### Write data clusters
+    if data_file_name is None:
+        for idx in bitmap_iterator(l2_bitmap, total_data_clusters):
+            cluster = os.pread(fd, cluster_size, cluster_size * idx)
+            # If the last cluster is smaller than cluster_size pad it with zeroes
+            if len(cluster) < cluster_size:
+                cluster += bytes(cluster_size - len(cluster))
+            sys.stdout.buffer.write(cluster)
+
+    if not data_file_raw:
+        os.close(fd)
+
+
+def main():
+    # Command-line arguments
+    parser = argparse.ArgumentParser(
+        description="This program converts a QEMU disk image to qcow2 "
+        "and writes it to the standard output"
+    )
+    parser.add_argument("input_file", help="name of the input file")
+    parser.add_argument(
+        "-f",
+        dest="input_format",
+        metavar="input_format",
+        help="format of the input file (default: raw)",
+        default="raw",
+    )
+    parser.add_argument(
+        "-c",
+        dest="cluster_size",
+        metavar="cluster_size",
+        help=f"qcow2 cluster size (default: {QCOW2_DEFAULT_CLUSTER_SIZE})",
+        default=QCOW2_DEFAULT_CLUSTER_SIZE,
+        type=int,
+        choices=[1 << x for x in range(9, 22)],
+    )
+    parser.add_argument(
+        "-r",
+        dest="refcount_bits",
+        metavar="refcount_bits",
+        help=f"width of the reference count entries (default: {QCOW2_DEFAULT_REFCOUNT_BITS})",
+        default=QCOW2_DEFAULT_REFCOUNT_BITS,
+        type=int,
+        choices=[1 << x for x in range(7)],
+    )
+    parser.add_argument(
+        "-d",
+        dest="data_file",
+        help="create an image with input_file as an external data file",
+        action="store_true",
+    )
+    parser.add_argument(
+        "-R",
+        dest="data_file_raw",
+        help="enable data_file_raw on the generated image (implies -d)",
+        action="store_true",
+    )
+    args = parser.parse_args()
+
+    if args.data_file_raw:
+        args.data_file = True
+
+    if not os.path.isfile(args.input_file):
+        sys.exit(f"[Error] {args.input_file} does not exist or is not a regular file.")
+
+    if args.data_file and args.input_format != "raw":
+        sys.exit("[Error] External data files can only be used with raw input images")
+
+    # A 512 byte header is too small for the data file name extension
+    if args.data_file and args.cluster_size == 512:
+        sys.exit("[Error] External data files require a larger cluster size")
+
+    if sys.stdout.isatty():
+        sys.exit("[Error] Refusing to write to a tty. Try redirecting stdout.")
+
+    if args.data_file:
+        data_file_name = args.input_file
+    else:
+        data_file_name = None
+
+    with get_input_as_raw_file(args.input_file, args.input_format) as raw_file:
+        write_qcow2_content(
+            raw_file,
+            args.cluster_size,
+            args.refcount_bits,
+            data_file_name,
+            args.data_file_raw,
+        )
+
+
+if __name__ == "__main__":
+    main()

+ 4 - 4
system/dma-helpers.c

@@ -211,7 +211,7 @@ static const AIOCBInfo dma_aiocb_info = {
     .cancel_async       = dma_aio_cancel,
 };
 
-BlockAIOCB *dma_blk_io(AioContext *ctx,
+BlockAIOCB *dma_blk_io(
     QEMUSGList *sg, uint64_t offset, uint32_t align,
     DMAIOFunc *io_func, void *io_func_opaque,
     BlockCompletionFunc *cb,
@@ -223,7 +223,7 @@ BlockAIOCB *dma_blk_io(AioContext *ctx,
 
     dbs->acb = NULL;
     dbs->sg = sg;
-    dbs->ctx = ctx;
+    dbs->ctx = qemu_get_current_aio_context();
     dbs->offset = offset;
     dbs->align = align;
     dbs->sg_cur_index = 0;
@@ -251,7 +251,7 @@ BlockAIOCB *dma_blk_read(BlockBackend *blk,
                          QEMUSGList *sg, uint64_t offset, uint32_t align,
                          void (*cb)(void *opaque, int ret), void *opaque)
 {
-    return dma_blk_io(blk_get_aio_context(blk), sg, offset, align,
+    return dma_blk_io(sg, offset, align,
                       dma_blk_read_io_func, blk, cb, opaque,
                       DMA_DIRECTION_FROM_DEVICE);
 }
@@ -269,7 +269,7 @@ BlockAIOCB *dma_blk_write(BlockBackend *blk,
                           QEMUSGList *sg, uint64_t offset, uint32_t align,
                           void (*cb)(void *opaque, int ret), void *opaque)
 {
-    return dma_blk_io(blk_get_aio_context(blk), sg, offset, align,
+    return dma_blk_io(sg, offset, align,
                       dma_blk_write_io_func, blk, cb, opaque,
                       DMA_DIRECTION_TO_DEVICE);
 }

+ 1 - 1
tests/qemu-iotests/051.pc.out

@@ -181,7 +181,7 @@ QEMU X.Y.Z monitor - type 'help' for more information
 
 Testing: -drive file=TEST_DIR/t.qcow2,if=none,node-name=disk -object iothread,id=thread0 -device virtio-scsi,iothread=thread0,id=virtio-scsi0 -device scsi-hd,bus=virtio-scsi0.0,drive=disk,share-rw=on -device virtio-scsi,id=virtio-scsi1 -device scsi-hd,bus=virtio-scsi1.0,drive=disk,share-rw=on
 QEMU X.Y.Z monitor - type 'help' for more information
-(qemu) QEMU_PROG: -device scsi-hd,bus=virtio-scsi1.0,drive=disk,share-rw=on: Cannot change iothread of active block backend
+(qemu) quit
 
 Testing: -drive file=TEST_DIR/t.qcow2,if=none,node-name=disk -object iothread,id=thread0 -device virtio-scsi,iothread=thread0,id=virtio-scsi0 -device scsi-hd,bus=virtio-scsi0.0,drive=disk,share-rw=on -device virtio-blk-pci,drive=disk,iothread=thread0,share-rw=on
 QEMU X.Y.Z monitor - type 'help' for more information

+ 1 - 1
tests/qemu-iotests/tests/qsd-migrate

@@ -22,7 +22,7 @@ import iotests
 
 from iotests import filter_qemu_io, filter_qtest
 
-iotests.script_initialize(supported_fmts=['generic'],
+iotests.script_initialize(supported_fmts=['qcow2', 'qed', 'raw'],
                           supported_protocols=['file'],
                           supported_platforms=['linux'])
 

+ 72 - 42
util/aio-posix.c

@@ -28,6 +28,9 @@
 /* Stop userspace polling on a handler if it isn't active for some time */
 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
 
+static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
+                                int64_t block_ns);
+
 bool aio_poll_disabled(AioContext *ctx)
 {
     return qatomic_read(&ctx->poll_disable_cnt);
@@ -392,7 +395,8 @@ static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
  * scanning all handlers with aio_dispatch_handlers().
  */
 static bool aio_dispatch_ready_handlers(AioContext *ctx,
-                                        AioHandlerList *ready_list)
+                                        AioHandlerList *ready_list,
+                                        int64_t block_ns)
 {
     bool progress = false;
     AioHandler *node;
@@ -400,6 +404,14 @@ static bool aio_dispatch_ready_handlers(AioContext *ctx,
     while ((node = QLIST_FIRST(ready_list))) {
         QLIST_REMOVE(node, node_ready);
         progress = aio_dispatch_handler(ctx, node) || progress;
+
+        /*
+         * Adjust polling time only after aio_dispatch_handler(), which can
+         * add the handler to ctx->poll_aio_handlers.
+         */
+        if (ctx->poll_max_ns && QLIST_IS_INSERTED(node, node_poll)) {
+            adjust_polling_time(ctx, &node->poll, block_ns);
+        }
     }
 
     return progress;
@@ -579,13 +591,19 @@ static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
 static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
                           int64_t *timeout)
 {
+    AioHandler *node;
     int64_t max_ns;
 
     if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
         return false;
     }
 
-    max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
+    max_ns = 0;
+    QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
+        max_ns = MAX(max_ns, node->poll.ns);
+    }
+    max_ns = qemu_soonest_timeout(*timeout, max_ns);
+
     if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
         /*
          * Enable poll mode. It pairs with the poll_set_started() in
@@ -600,6 +618,46 @@ static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
     return false;
 }
 
+static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
+                                int64_t block_ns)
+{
+    if (block_ns <= poll->ns) {
+        /* This is the sweet spot, no adjustment needed */
+    } else if (block_ns > ctx->poll_max_ns) {
+        /* We'd have to poll for too long, poll less */
+        int64_t old = poll->ns;
+
+        if (ctx->poll_shrink) {
+            poll->ns /= ctx->poll_shrink;
+        } else {
+            poll->ns = 0;
+        }
+
+        trace_poll_shrink(ctx, old, poll->ns);
+    } else if (poll->ns < ctx->poll_max_ns &&
+               block_ns < ctx->poll_max_ns) {
+        /* There is room to grow, poll longer */
+        int64_t old = poll->ns;
+        int64_t grow = ctx->poll_grow;
+
+        if (grow == 0) {
+            grow = 2;
+        }
+
+        if (poll->ns) {
+            poll->ns *= grow;
+        } else {
+            poll->ns = 4000; /* start polling at 4 microseconds */
+        }
+
+        if (poll->ns > ctx->poll_max_ns) {
+            poll->ns = ctx->poll_max_ns;
+        }
+
+        trace_poll_grow(ctx, old, poll->ns);
+    }
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
@@ -607,6 +665,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
     bool use_notify_me;
     int64_t timeout;
     int64_t start = 0;
+    int64_t block_ns = 0;
 
     /*
      * There cannot be two concurrent aio_poll calls for the same AioContext (or
@@ -679,49 +738,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
     aio_notify_accept(ctx);
 
-    /* Adjust polling time */
+    /* Calculate blocked time for adaptive polling */
     if (ctx->poll_max_ns) {
-        int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
-
-        if (block_ns <= ctx->poll_ns) {
-            /* This is the sweet spot, no adjustment needed */
-        } else if (block_ns > ctx->poll_max_ns) {
-            /* We'd have to poll for too long, poll less */
-            int64_t old = ctx->poll_ns;
-
-            if (ctx->poll_shrink) {
-                ctx->poll_ns /= ctx->poll_shrink;
-            } else {
-                ctx->poll_ns = 0;
-            }
-
-            trace_poll_shrink(ctx, old, ctx->poll_ns);
-        } else if (ctx->poll_ns < ctx->poll_max_ns &&
-                   block_ns < ctx->poll_max_ns) {
-            /* There is room to grow, poll longer */
-            int64_t old = ctx->poll_ns;
-            int64_t grow = ctx->poll_grow;
-
-            if (grow == 0) {
-                grow = 2;
-            }
-
-            if (ctx->poll_ns) {
-                ctx->poll_ns *= grow;
-            } else {
-                ctx->poll_ns = 4000; /* start polling at 4 microseconds */
-            }
-
-            if (ctx->poll_ns > ctx->poll_max_ns) {
-                ctx->poll_ns = ctx->poll_max_ns;
-            }
-
-            trace_poll_grow(ctx, old, ctx->poll_ns);
-        }
+        block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
     }
 
     progress |= aio_bh_poll(ctx);
-    progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
+    progress |= aio_dispatch_ready_handlers(ctx, &ready_list, block_ns);
 
     aio_free_deleted_handlers(ctx);
 
@@ -767,11 +790,18 @@ void aio_context_use_g_source(AioContext *ctx)
 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
                                  int64_t grow, int64_t shrink, Error **errp)
 {
+    AioHandler *node;
+
+    qemu_lockcnt_inc(&ctx->list_lock);
+    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        node->poll.ns = 0;
+    }
+    qemu_lockcnt_dec(&ctx->list_lock);
+
     /* No thread synchronization here, it doesn't matter if an incorrect value
      * is used once.
      */
     ctx->poll_max_ns = max_ns;
-    ctx->poll_ns = 0;
     ctx->poll_grow = grow;
     ctx->poll_shrink = shrink;
 

+ 1 - 0
util/aio-posix.h

@@ -38,6 +38,7 @@ struct AioHandler {
 #endif
     int64_t poll_idle_timeout; /* when to stop userspace polling */
     bool poll_ready; /* has polling detected an event? */
+    AioPolledEvent poll;
 };
 
 /* Add a handler to a ready list */

+ 0 - 1
util/async.c

@@ -609,7 +609,6 @@ AioContext *aio_context_new(Error **errp)
     qemu_rec_mutex_init(&ctx->lock);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
 
-    ctx->poll_ns = 0;
     ctx->poll_max_ns = 0;
     ctx->poll_grow = 0;
     ctx->poll_shrink = 0;