Просмотр исходного кода

Merge tag 'block-pull-request' of https://gitlab.com/stefanha/qemu into staging

Pull request

- Add new thread-pool-min/thread-pool-max parameters to control the thread pool
  used for async I/O.

- Fix virtio-scsi IOThread 100% CPU consumption QEMU 7.0 regression.

# -----BEGIN PGP SIGNATURE-----
#
# iQEzBAABCAAdFiEEhpWov9P5fNqsNXdanKSrs4Grc8gFAmJ5DqgACgkQnKSrs4Gr
# c8iAqAf/WEJzEso0Hu3UUYJi2lAXpLxWPjoNBlPdQlKIJ/I0zQIF0P7GeCifF+0l
# iMjgBv0ofyAuV47gaTJlVrAR75+hJ/IXNDhnu3UuvNWfVOqvksgw6kuHkMo9A2hC
# 4tIHEU9J8jbQSSdQTaZR8Zj4FX1/zcxMBAXT3YO3De6zo78RatBTuNP4dsZzt8bI
# Qs1a4A0p2ScNXK8EcF4QwAWfoxu9OPPzN52DBCNxcIcnn0SUab4NbDxzpRV4ZhDP
# 08WoafI5O+2Kb36QysJN01LqajHrClG/fozrPzBLq5aZUK3xewJGB1hEdGTLkkmz
# NJNBg5Ldszwj4PDZ1dFU3/03aigb3g==
# =t5eR
# -----END PGP SIGNATURE-----
# gpg: Signature made Mon 09 May 2022 05:52:56 AM PDT
# gpg:                using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full]
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>" [full]

* tag 'block-pull-request' of https://gitlab.com/stefanha/qemu:
  virtio-scsi: move request-related items from .h to .c
  virtio-scsi: clean up virtio_scsi_handle_cmd_vq()
  virtio-scsi: clean up virtio_scsi_handle_ctrl_vq()
  virtio-scsi: clean up virtio_scsi_handle_event_vq()
  virtio-scsi: don't waste CPU polling the event virtqueue
  virtio-scsi: fix ctrl and event handler functions in dataplane mode
  util/event-loop-base: Introduce options to set the thread pool size
  util/main-loop: Introduce the main loop into QOM
  Introduce event-loop-base abstract class

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
Richard Henderson 3 лет назад
Родитель
Сommit
178bacb66d

+ 140 - 0
event-loop-base.c

@@ -0,0 +1,140 @@
+/*
+ * QEMU event-loop base
+ *
+ * Copyright (C) 2022 Red Hat Inc
+ *
+ * Authors:
+ *  Stefan Hajnoczi <stefanha@redhat.com>
+ *  Nicolas Saenz Julienne <nsaenzju@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qom/object_interfaces.h"
+#include "qapi/error.h"
+#include "block/thread-pool.h"
+#include "sysemu/event-loop-base.h"
+
+typedef struct {
+    const char *name;
+    ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
+} EventLoopBaseParamInfo;
+
+static void event_loop_base_instance_init(Object *obj)
+{
+    EventLoopBase *base = EVENT_LOOP_BASE(obj);
+
+    base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+}
+
+static EventLoopBaseParamInfo aio_max_batch_info = {
+    "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
+};
+static EventLoopBaseParamInfo thread_pool_min_info = {
+    "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
+};
+static EventLoopBaseParamInfo thread_pool_max_info = {
+    "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
+};
+
+static void event_loop_base_get_param(Object *obj, Visitor *v,
+        const char *name, void *opaque, Error **errp)
+{
+    EventLoopBase *event_loop_base = EVENT_LOOP_BASE(obj);
+    EventLoopBaseParamInfo *info = opaque;
+    int64_t *field = (void *)event_loop_base + info->offset;
+
+    visit_type_int64(v, name, field, errp);
+}
+
+static void event_loop_base_set_param(Object *obj, Visitor *v,
+        const char *name, void *opaque, Error **errp)
+{
+    EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(obj);
+    EventLoopBase *base = EVENT_LOOP_BASE(obj);
+    EventLoopBaseParamInfo *info = opaque;
+    int64_t *field = (void *)base + info->offset;
+    int64_t value;
+
+    if (!visit_type_int64(v, name, &value, errp)) {
+        return;
+    }
+
+    if (value < 0) {
+        error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
+                   info->name, INT64_MAX);
+        return;
+    }
+
+    *field = value;
+
+    if (bc->update_params) {
+        bc->update_params(base, errp);
+    }
+
+    return;
+}
+
+static void event_loop_base_complete(UserCreatable *uc, Error **errp)
+{
+    EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
+    EventLoopBase *base = EVENT_LOOP_BASE(uc);
+
+    if (bc->init) {
+        bc->init(base, errp);
+    }
+}
+
+static bool event_loop_base_can_be_deleted(UserCreatable *uc)
+{
+    EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
+    EventLoopBase *backend = EVENT_LOOP_BASE(uc);
+
+    if (bc->can_be_deleted) {
+        return bc->can_be_deleted(backend);
+    }
+
+    return true;
+}
+
+static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
+{
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
+    ucc->complete = event_loop_base_complete;
+    ucc->can_be_deleted = event_loop_base_can_be_deleted;
+
+    object_class_property_add(klass, "aio-max-batch", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &aio_max_batch_info);
+    object_class_property_add(klass, "thread-pool-min", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_min_info);
+    object_class_property_add(klass, "thread-pool-max", "int",
+                              event_loop_base_get_param,
+                              event_loop_base_set_param,
+                              NULL, &thread_pool_max_info);
+}
+
+static const TypeInfo event_loop_base_info = {
+    .name = TYPE_EVENT_LOOP_BASE,
+    .parent = TYPE_OBJECT,
+    .instance_size = sizeof(EventLoopBase),
+    .instance_init = event_loop_base_instance_init,
+    .class_size = sizeof(EventLoopBaseClass),
+    .class_init = event_loop_base_class_init,
+    .abstract = true,
+    .interfaces = (InterfaceInfo[]) {
+        { TYPE_USER_CREATABLE },
+        { }
+    }
+};
+
+static void register_types(void)
+{
+    type_register_static(&event_loop_base_info);
+}
+type_init(register_types);

+ 1 - 1
hw/scsi/virtio-scsi-dataplane.c

@@ -138,7 +138,7 @@ int virtio_scsi_dataplane_start(VirtIODevice *vdev)
 
     aio_context_acquire(s->ctx);
     virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx);
-    virtio_queue_aio_attach_host_notifier(vs->event_vq, s->ctx);
+    virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx);
 
     for (i = 0; i < vs->conf.num_queues; i++) {
         virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx);

+ 71 - 30
hw/scsi/virtio-scsi.c

@@ -29,6 +29,43 @@
 #include "hw/virtio/virtio-access.h"
 #include "trace.h"
 
+typedef struct VirtIOSCSIReq {
+    /*
+     * Note:
+     * - fields up to resp_iov are initialized by virtio_scsi_init_req;
+     * - fields starting at vring are zeroed by virtio_scsi_init_req.
+     */
+    VirtQueueElement elem;
+
+    VirtIOSCSI *dev;
+    VirtQueue *vq;
+    QEMUSGList qsgl;
+    QEMUIOVector resp_iov;
+
+    union {
+        /* Used for two-stage request submission */
+        QTAILQ_ENTRY(VirtIOSCSIReq) next;
+
+        /* Used for cancellation of request during TMFs */
+        int remaining;
+    };
+
+    SCSIRequest *sreq;
+    size_t resp_size;
+    enum SCSIXferMode mode;
+    union {
+        VirtIOSCSICmdResp     cmd;
+        VirtIOSCSICtrlTMFResp tmf;
+        VirtIOSCSICtrlANResp  an;
+        VirtIOSCSIEvent       event;
+    } resp;
+    union {
+        VirtIOSCSICmdReq      cmd;
+        VirtIOSCSICtrlTMFReq  tmf;
+        VirtIOSCSICtrlANReq   an;
+    } req;
+} VirtIOSCSIReq;
+
 static inline int virtio_scsi_get_lun(uint8_t *lun)
 {
     return ((lun[2] << 8) | lun[3]) & 0x3FFF;
@@ -45,7 +82,7 @@ static inline SCSIDevice *virtio_scsi_device_get(VirtIOSCSI *s, uint8_t *lun)
     return scsi_device_get(&s->bus, 0, lun[1], virtio_scsi_get_lun(lun));
 }
 
-void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
+static void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
 {
     VirtIODevice *vdev = VIRTIO_DEVICE(s);
     const size_t zero_skip =
@@ -58,7 +95,7 @@ void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
     memset((uint8_t *)req + zero_skip, 0, sizeof(*req) - zero_skip);
 }
 
-void virtio_scsi_free_req(VirtIOSCSIReq *req)
+static void virtio_scsi_free_req(VirtIOSCSIReq *req)
 {
     qemu_iovec_destroy(&req->resp_iov);
     qemu_sglist_destroy(&req->qsgl);
@@ -460,28 +497,41 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
     }
 }
 
-bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
     VirtIOSCSIReq *req;
-    bool progress = false;
 
     while ((req = virtio_scsi_pop_req(s, vq))) {
-        progress = true;
         virtio_scsi_handle_ctrl_req(s, req);
     }
-    return progress;
+}
+
+/*
+ * If dataplane is configured but not yet started, do so now and return true on
+ * success.
+ *
+ * Dataplane is started by the core virtio code but virtqueue handler functions
+ * can also be invoked when a guest kicks before DRIVER_OK, so this helper
+ * function helps us deal with manually starting ioeventfd in that case.
+ */
+static bool virtio_scsi_defer_to_dataplane(VirtIOSCSI *s)
+{
+    if (!s->ctx || s->dataplane_started) {
+        return false;
+    }
+
+    virtio_device_start_ioeventfd(&s->parent_obj.parent_obj);
+    return !s->dataplane_fenced;
 }
 
 static void virtio_scsi_handle_ctrl(VirtIODevice *vdev, VirtQueue *vq)
 {
     VirtIOSCSI *s = (VirtIOSCSI *)vdev;
 
-    if (s->ctx) {
-        virtio_device_start_ioeventfd(vdev);
-        if (!s->dataplane_fenced) {
-            return;
-        }
+    if (virtio_scsi_defer_to_dataplane(s)) {
+        return;
     }
+
     virtio_scsi_acquire(s);
     virtio_scsi_handle_ctrl_vq(s, vq);
     virtio_scsi_release(s);
@@ -672,12 +722,11 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req)
     scsi_req_unref(sreq);
 }
 
-bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
     VirtIOSCSIReq *req, *next;
     int ret = 0;
     bool suppress_notifications = virtio_queue_get_notification(vq);
-    bool progress = false;
 
     QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
 
@@ -687,7 +736,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
         }
 
         while ((req = virtio_scsi_pop_req(s, vq))) {
-            progress = true;
             ret = virtio_scsi_handle_cmd_req_prepare(s, req);
             if (!ret) {
                 QTAILQ_INSERT_TAIL(&reqs, req, next);
@@ -712,7 +760,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
     QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
         virtio_scsi_handle_cmd_req_submit(s, req);
     }
-    return progress;
 }
 
 static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq)
@@ -720,12 +767,10 @@ static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq)
     /* use non-QOM casts in the data path */
     VirtIOSCSI *s = (VirtIOSCSI *)vdev;
 
-    if (s->ctx && !s->dataplane_started) {
-        virtio_device_start_ioeventfd(vdev);
-        if (!s->dataplane_fenced) {
-            return;
-        }
+    if (virtio_scsi_defer_to_dataplane(s)) {
+        return;
     }
+
     virtio_scsi_acquire(s);
     virtio_scsi_handle_cmd_vq(s, vq);
     virtio_scsi_release(s);
@@ -793,8 +838,8 @@ static void virtio_scsi_reset(VirtIODevice *vdev)
     s->events_dropped = false;
 }
 
-void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
-                            uint32_t event, uint32_t reason)
+static void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
+                                   uint32_t event, uint32_t reason)
 {
     VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
     VirtIOSCSIReq *req;
@@ -842,25 +887,21 @@ void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
     virtio_scsi_complete_req(req);
 }
 
-bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
 {
     if (s->events_dropped) {
         virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
-        return true;
     }
-    return false;
 }
 
 static void virtio_scsi_handle_event(VirtIODevice *vdev, VirtQueue *vq)
 {
     VirtIOSCSI *s = VIRTIO_SCSI(vdev);
 
-    if (s->ctx) {
-        virtio_device_start_ioeventfd(vdev);
-        if (!s->dataplane_fenced) {
-            return;
-        }
+    if (virtio_scsi_defer_to_dataplane(s)) {
+        return;
     }
+
     virtio_scsi_acquire(s);
     virtio_scsi_handle_event_vq(s, vq);
     virtio_scsi_release(s);

+ 13 - 0
hw/virtio/virtio.c

@@ -3534,6 +3534,19 @@ void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx)
                                 virtio_queue_host_notifier_aio_poll_end);
 }
 
+/*
+ * Same as virtio_queue_aio_attach_host_notifier() but without polling. Use
+ * this for rx virtqueues and similar cases where the virtqueue handler
+ * function does not pop all elements. When the virtqueue is left non-empty
+ * polling consumes CPU cycles and should not be used.
+ */
+void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx)
+{
+    aio_set_event_notifier(ctx, &vq->host_notifier, true,
+                           virtio_queue_host_notifier_read,
+                           NULL, NULL);
+}
+
 void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx)
 {
     aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL, NULL);

+ 10 - 0
include/block/aio.h

@@ -192,6 +192,8 @@ struct AioContext {
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
     QEMUBH *co_schedule_bh;
 
+    int thread_pool_min;
+    int thread_pool_max;
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
                                 Error **errp);
 
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: min number of threads to have readily available in the thread pool
+ * @min: max number of threads the thread pool can contain
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp);
 #endif

+ 3 - 0
include/block/thread-pool.h

@@ -20,6 +20,8 @@
 
 #include "block/block.h"
 
+#define THREAD_POOL_MAX_THREADS_DEFAULT         64
+
 typedef int ThreadPoolFunc(void *opaque);
 
 typedef struct ThreadPool ThreadPool;
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
 
 #endif

+ 0 - 43
include/hw/virtio/virtio-scsi.h

@@ -92,42 +92,6 @@ struct VirtIOSCSI {
     uint32_t host_features;
 };
 
-typedef struct VirtIOSCSIReq {
-    /* Note:
-     * - fields up to resp_iov are initialized by virtio_scsi_init_req;
-     * - fields starting at vring are zeroed by virtio_scsi_init_req.
-     * */
-    VirtQueueElement elem;
-
-    VirtIOSCSI *dev;
-    VirtQueue *vq;
-    QEMUSGList qsgl;
-    QEMUIOVector resp_iov;
-
-    union {
-        /* Used for two-stage request submission */
-        QTAILQ_ENTRY(VirtIOSCSIReq) next;
-
-        /* Used for cancellation of request during TMFs */
-        int remaining;
-    };
-
-    SCSIRequest *sreq;
-    size_t resp_size;
-    enum SCSIXferMode mode;
-    union {
-        VirtIOSCSICmdResp     cmd;
-        VirtIOSCSICtrlTMFResp tmf;
-        VirtIOSCSICtrlANResp  an;
-        VirtIOSCSIEvent       event;
-    } resp;
-    union {
-        VirtIOSCSICmdReq      cmd;
-        VirtIOSCSICtrlTMFReq  tmf;
-        VirtIOSCSICtrlANReq   an;
-    } req;
-} VirtIOSCSIReq;
-
 static inline void virtio_scsi_acquire(VirtIOSCSI *s)
 {
     if (s->ctx) {
@@ -149,13 +113,6 @@ void virtio_scsi_common_realize(DeviceState *dev,
                                 Error **errp);
 
 void virtio_scsi_common_unrealize(DeviceState *dev);
-bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq);
-bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq);
-bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq);
-void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req);
-void virtio_scsi_free_req(VirtIOSCSIReq *req);
-void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
-                            uint32_t event, uint32_t reason);
 
 void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp);
 int virtio_scsi_dataplane_start(VirtIODevice *s);

+ 1 - 0
include/hw/virtio/virtio.h

@@ -317,6 +317,7 @@ EventNotifier *virtio_queue_get_host_notifier(VirtQueue *vq);
 void virtio_queue_set_host_notifier_enabled(VirtQueue *vq, bool enabled);
 void virtio_queue_host_notifier_read(EventNotifier *n);
 void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx);
+void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx);
 void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx);
 VirtQueue *virtio_vector_first_queue(VirtIODevice *vdev, uint16_t vector);
 VirtQueue *virtio_vector_next_queue(VirtQueue *vq);

+ 10 - 0
include/qemu/main-loop.h

@@ -26,9 +26,19 @@
 #define QEMU_MAIN_LOOP_H
 
 #include "block/aio.h"
+#include "qom/object.h"
+#include "sysemu/event-loop-base.h"
 
 #define SIG_IPI SIGUSR1
 
+#define TYPE_MAIN_LOOP  "main-loop"
+OBJECT_DECLARE_TYPE(MainLoop, MainLoopClass, MAIN_LOOP)
+
+struct MainLoop {
+    EventLoopBase parent_obj;
+};
+typedef struct MainLoop MainLoop;
+
 /**
  * qemu_init_main_loop: Set up the process so that it can run the main loop.
  *

+ 41 - 0
include/sysemu/event-loop-base.h

@@ -0,0 +1,41 @@
+/*
+ * QEMU event-loop backend
+ *
+ * Copyright (C) 2022 Red Hat Inc
+ *
+ * Authors:
+ *  Nicolas Saenz Julienne <nsaenzju@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef QEMU_EVENT_LOOP_BASE_H
+#define QEMU_EVENT_LOOP_BASE_H
+
+#include "qom/object.h"
+#include "block/aio.h"
+#include "qemu/typedefs.h"
+
+#define TYPE_EVENT_LOOP_BASE         "event-loop-base"
+OBJECT_DECLARE_TYPE(EventLoopBase, EventLoopBaseClass,
+                    EVENT_LOOP_BASE)
+
+struct EventLoopBaseClass {
+    ObjectClass parent_class;
+
+    void (*init)(EventLoopBase *base, Error **errp);
+    void (*update_params)(EventLoopBase *base, Error **errp);
+    bool (*can_be_deleted)(EventLoopBase *base);
+};
+
+struct EventLoopBase {
+    Object parent;
+
+    /* AioContext AIO engine parameters */
+    int64_t aio_max_batch;
+
+    /* AioContext thread pool parameters */
+    int64_t thread_pool_min;
+    int64_t thread_pool_max;
+};
+#endif

+ 2 - 4
include/sysemu/iothread.h

@@ -17,11 +17,12 @@
 #include "block/aio.h"
 #include "qemu/thread.h"
 #include "qom/object.h"
+#include "sysemu/event-loop-base.h"
 
 #define TYPE_IOTHREAD "iothread"
 
 struct IOThread {
-    Object parent_obj;
+    EventLoopBase parent_obj;
 
     QemuThread thread;
     AioContext *ctx;
@@ -37,9 +38,6 @@ struct IOThread {
     int64_t poll_max_ns;
     int64_t poll_grow;
     int64_t poll_shrink;
-
-    /* AioContext AIO engine parameters */
-    int64_t aio_max_batch;
 };
 typedef struct IOThread IOThread;
 

+ 22 - 46
iothread.c

@@ -17,6 +17,7 @@
 #include "qemu/module.h"
 #include "block/aio.h"
 #include "block/block.h"
+#include "sysemu/event-loop-base.h"
 #include "sysemu/iothread.h"
 #include "qapi/error.h"
 #include "qapi/qapi-commands-misc.h"
@@ -152,10 +153,15 @@ static void iothread_init_gcontext(IOThread *iothread)
     iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
 }
 
-static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
+static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
 {
+    IOThread *iothread = IOTHREAD(base);
     ERRP_GUARD();
 
+    if (!iothread->ctx) {
+        return;
+    }
+
     aio_context_set_poll_params(iothread->ctx,
                                 iothread->poll_max_ns,
                                 iothread->poll_grow,
@@ -166,14 +172,18 @@ static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
     }
 
     aio_context_set_aio_params(iothread->ctx,
-                               iothread->aio_max_batch,
+                               iothread->parent_obj.aio_max_batch,
                                errp);
+
+    aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
 }
 
-static void iothread_complete(UserCreatable *obj, Error **errp)
+
+static void iothread_init(EventLoopBase *base, Error **errp)
 {
     Error *local_error = NULL;
-    IOThread *iothread = IOTHREAD(obj);
+    IOThread *iothread = IOTHREAD(base);
     char *thread_name;
 
     iothread->stopping = false;
@@ -189,7 +199,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
      */
     iothread_init_gcontext(iothread);
 
-    iothread_set_aio_context_params(iothread, &local_error);
+    iothread_set_aio_context_params(base, &local_error);
     if (local_error) {
         error_propagate(errp, local_error);
         aio_context_unref(iothread->ctx);
@@ -201,7 +211,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
      * to inherit.
      */
     thread_name = g_strdup_printf("IO %s",
-                        object_get_canonical_path_component(OBJECT(obj)));
+                        object_get_canonical_path_component(OBJECT(base)));
     qemu_thread_create(&iothread->thread, thread_name, iothread_run,
                        iothread, QEMU_THREAD_JOINABLE);
     g_free(thread_name);
@@ -226,9 +236,6 @@ static IOThreadParamInfo poll_grow_info = {
 static IOThreadParamInfo poll_shrink_info = {
     "poll-shrink", offsetof(IOThread, poll_shrink),
 };
-static IOThreadParamInfo aio_max_batch_info = {
-    "aio-max-batch", offsetof(IOThread, aio_max_batch),
-};
 
 static void iothread_get_param(Object *obj, Visitor *v,
         const char *name, IOThreadParamInfo *info, Error **errp)
@@ -288,35 +295,12 @@ static void iothread_set_poll_param(Object *obj, Visitor *v,
     }
 }
 
-static void iothread_get_aio_param(Object *obj, Visitor *v,
-        const char *name, void *opaque, Error **errp)
-{
-    IOThreadParamInfo *info = opaque;
-
-    iothread_get_param(obj, v, name, info, errp);
-}
-
-static void iothread_set_aio_param(Object *obj, Visitor *v,
-        const char *name, void *opaque, Error **errp)
-{
-    IOThread *iothread = IOTHREAD(obj);
-    IOThreadParamInfo *info = opaque;
-
-    if (!iothread_set_param(obj, v, name, info, errp)) {
-        return;
-    }
-
-    if (iothread->ctx) {
-        aio_context_set_aio_params(iothread->ctx,
-                                   iothread->aio_max_batch,
-                                   errp);
-    }
-}
-
 static void iothread_class_init(ObjectClass *klass, void *class_data)
 {
-    UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
-    ucc->complete = iothread_complete;
+    EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass);
+
+    bc->init = iothread_init;
+    bc->update_params = iothread_set_aio_context_params;
 
     object_class_property_add(klass, "poll-max-ns", "int",
                               iothread_get_poll_param,
@@ -330,23 +314,15 @@ static void iothread_class_init(ObjectClass *klass, void *class_data)
                               iothread_get_poll_param,
                               iothread_set_poll_param,
                               NULL, &poll_shrink_info);
-    object_class_property_add(klass, "aio-max-batch", "int",
-                              iothread_get_aio_param,
-                              iothread_set_aio_param,
-                              NULL, &aio_max_batch_info);
 }
 
 static const TypeInfo iothread_info = {
     .name = TYPE_IOTHREAD,
-    .parent = TYPE_OBJECT,
+    .parent = TYPE_EVENT_LOOP_BASE,
     .class_init = iothread_class_init,
     .instance_size = sizeof(IOThread),
     .instance_init = iothread_instance_init,
     .instance_finalize = iothread_instance_finalize,
-    .interfaces = (InterfaceInfo[]) {
-        {TYPE_USER_CREATABLE},
-        {}
-    },
 };
 
 static void iothread_register_types(void)
@@ -383,7 +359,7 @@ static int query_one_iothread(Object *object, void *opaque)
     info->poll_max_ns = iothread->poll_max_ns;
     info->poll_grow = iothread->poll_grow;
     info->poll_shrink = iothread->poll_shrink;
-    info->aio_max_batch = iothread->aio_max_batch;
+    info->aio_max_batch = iothread->parent_obj.aio_max_batch;
 
     QAPI_LIST_APPEND(*tail, info);
     return 0;

+ 16 - 10
meson.build

@@ -3025,6 +3025,7 @@ subdir('qom')
 subdir('authz')
 subdir('crypto')
 subdir('ui')
+subdir('hw')
 
 
 if enable_modules
@@ -3032,6 +3033,18 @@ if enable_modules
   modulecommon = declare_dependency(link_whole: libmodulecommon, compile_args: '-DBUILD_DSO')
 endif
 
+qom_ss = qom_ss.apply(config_host, strict: false)
+libqom = static_library('qom', qom_ss.sources() + genh,
+                        dependencies: [qom_ss.dependencies()],
+                        name_suffix: 'fa')
+qom = declare_dependency(link_whole: libqom)
+
+event_loop_base = files('event-loop-base.c')
+event_loop_base = static_library('event-loop-base', sources: event_loop_base + genh,
+                                 build_by_default: true)
+event_loop_base = declare_dependency(link_whole: event_loop_base,
+                                     dependencies: [qom])
+
 stub_ss = stub_ss.apply(config_all, strict: false)
 
 util_ss.add_all(trace_ss)
@@ -3040,7 +3053,8 @@ libqemuutil = static_library('qemuutil',
                              sources: util_ss.sources() + stub_ss.sources() + genh,
                              dependencies: [util_ss.dependencies(), libm, threads, glib, socket, malloc, pixman])
 qemuutil = declare_dependency(link_with: libqemuutil,
-                              sources: genh + version_res)
+                              sources: genh + version_res,
+                              dependencies: [event_loop_base])
 
 if have_system or have_user
   decodetree = generator(find_program('scripts/decodetree.py'),
@@ -3118,7 +3132,6 @@ subdir('monitor')
 subdir('net')
 subdir('replay')
 subdir('semihosting')
-subdir('hw')
 subdir('tcg')
 subdir('fpu')
 subdir('accel')
@@ -3243,13 +3256,6 @@ qemu_syms = custom_target('qemu.syms', output: 'qemu.syms',
                              capture: true,
                              command: [undefsym, nm, '@INPUT@'])
 
-qom_ss = qom_ss.apply(config_host, strict: false)
-libqom = static_library('qom', qom_ss.sources() + genh,
-                        dependencies: [qom_ss.dependencies()],
-                        name_suffix: 'fa')
-
-qom = declare_dependency(link_whole: libqom)
-
 authz_ss = authz_ss.apply(config_host, strict: false)
 libauthz = static_library('authz', authz_ss.sources() + genh,
                           dependencies: [authz_ss.dependencies()],
@@ -3302,7 +3308,7 @@ libblockdev = static_library('blockdev', blockdev_ss.sources() + genh,
                              build_by_default: false)
 
 blockdev = declare_dependency(link_whole: [libblockdev],
-                              dependencies: [block])
+                              dependencies: [block, event_loop_base])
 
 qmp_ss = qmp_ss.apply(config_host, strict: false)
 libqmp = static_library('qmp', qmp_ss.sources() + genh,

+ 38 - 5
qapi/qom.json

@@ -499,6 +499,28 @@
             '*repeat': 'bool',
             '*grab-toggle': 'GrabToggleKeys' } }
 
+##
+# @EventLoopBaseProperties:
+#
+# Common properties for event loops
+#
+# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
+#                 0 means that the engine will use its default.
+#                 (default: 0)
+#
+# @thread-pool-min: minimum number of threads reserved in the thread pool
+#                   (default:0)
+#
+# @thread-pool-max: maximum number of threads the thread pool can contain
+#                   (default:64)
+#
+# Since: 7.1
+##
+{ 'struct': 'EventLoopBaseProperties',
+  'data': { '*aio-max-batch': 'int',
+            '*thread-pool-min': 'int',
+            '*thread-pool-max': 'int' } }
+
 ##
 # @IothreadProperties:
 #
@@ -516,17 +538,26 @@
 #               algorithm detects it is spending too long polling without
 #               encountering events. 0 selects a default behaviour (default: 0)
 #
-# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
-#                 0 means that the engine will use its default
-#                 (default:0, since 6.1)
+# The @aio-max-batch option is available since 6.1.
 #
 # Since: 2.0
 ##
 { 'struct': 'IothreadProperties',
+  'base': 'EventLoopBaseProperties',
   'data': { '*poll-max-ns': 'int',
             '*poll-grow': 'int',
-            '*poll-shrink': 'int',
-            '*aio-max-batch': 'int' } }
+            '*poll-shrink': 'int' } }
+
+##
+# @MainLoopProperties:
+#
+# Properties for the main-loop object.
+#
+# Since: 7.1
+##
+{ 'struct': 'MainLoopProperties',
+  'base': 'EventLoopBaseProperties',
+  'data': {} }
 
 ##
 # @MemoryBackendProperties:
@@ -818,6 +849,7 @@
     { 'name': 'input-linux',
       'if': 'CONFIG_LINUX' },
     'iothread',
+    'main-loop',
     { 'name': 'memory-backend-epc',
       'if': 'CONFIG_LINUX' },
     'memory-backend-file',
@@ -883,6 +915,7 @@
       'input-linux':                { 'type': 'InputLinuxProperties',
                                       'if': 'CONFIG_LINUX' },
       'iothread':                   'IothreadProperties',
+      'main-loop':                  'MainLoopProperties',
       'memory-backend-epc':         { 'type': 'MemoryBackendEpcProperties',
                                       'if': 'CONFIG_LINUX' },
       'memory-backend-file':        'MemoryBackendFileProperties',

+ 1 - 0
util/aio-posix.c

@@ -15,6 +15,7 @@
 
 #include "qemu/osdep.h"
 #include "block/block.h"
+#include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 #include "qemu/rcu.h"
 #include "qemu/rcu_queue.h"

+ 20 - 0
util/async.c

@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
 
     ctx->aio_max_batch = 0;
 
+    ctx->thread_pool_min = 0;
+    ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+
     return ctx;
 fail:
     g_source_destroy(&ctx->source);
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
     assert(!get_my_aiocontext());
     set_my_aiocontext(ctx);
 }
+
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+                                        int64_t max, Error **errp)
+{
+
+    if (min > max || !max || min > INT_MAX || max > INT_MAX) {
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+        return;
+    }
+
+    ctx->thread_pool_min = min;
+    ctx->thread_pool_max = max;
+
+    if (ctx->thread_pool) {
+        thread_pool_update_params(ctx->thread_pool, ctx);
+    }
+}

+ 65 - 0
util/main-loop.c

@@ -30,9 +30,11 @@
 #include "sysemu/replay.h"
 #include "qemu/main-loop.h"
 #include "block/aio.h"
+#include "block/thread-pool.h"
 #include "qemu/error-report.h"
 #include "qemu/queue.h"
 #include "qemu/compiler.h"
+#include "qom/object.h"
 
 #ifndef _WIN32
 #include <sys/wait.h>
@@ -184,6 +186,69 @@ int qemu_init_main_loop(Error **errp)
     return 0;
 }
 
+static void main_loop_update_params(EventLoopBase *base, Error **errp)
+{
+    ERRP_GUARD();
+
+    if (!qemu_aio_context) {
+        error_setg(errp, "qemu aio context not ready");
+        return;
+    }
+
+    aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
+    if (*errp) {
+        return;
+    }
+
+    aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
+                                       base->thread_pool_max, errp);
+}
+
+MainLoop *mloop;
+
+static void main_loop_init(EventLoopBase *base, Error **errp)
+{
+    MainLoop *m = MAIN_LOOP(base);
+
+    if (mloop) {
+        error_setg(errp, "only one main-loop instance allowed");
+        return;
+    }
+
+    main_loop_update_params(base, errp);
+
+    mloop = m;
+    return;
+}
+
+static bool main_loop_can_be_deleted(EventLoopBase *base)
+{
+    return false;
+}
+
+static void main_loop_class_init(ObjectClass *oc, void *class_data)
+{
+    EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(oc);
+
+    bc->init = main_loop_init;
+    bc->update_params = main_loop_update_params;
+    bc->can_be_deleted = main_loop_can_be_deleted;
+}
+
+static const TypeInfo main_loop_info = {
+    .name = TYPE_MAIN_LOOP,
+    .parent = TYPE_EVENT_LOOP_BASE,
+    .class_init = main_loop_class_init,
+    .instance_size = sizeof(MainLoop),
+};
+
+static void main_loop_register_types(void)
+{
+    type_register_static(&main_loop_info);
+}
+
+type_init(main_loop_register_types)
+
 static int max_priority;
 
 #ifndef _WIN32

+ 51 - 4
util/thread-pool.c

@@ -58,7 +58,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    int min_threads;
+    int max_threads;
 };
 
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+    /*
+     * The semaphore timed out, we should exit the loop except when:
+     *  - There is work to do, we raced with the signal.
+     *  - The max threads threshold just changed, we raced with the signal.
+     *  - The thread pool forces a minimum number of readily available threads.
+     */
+    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+            pool->cur_threads > pool->max_threads ||
+            pool->cur_threads <= pool->min_threads)) {
+            return true;
+    }
+
+    return false;
+}
+
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1 || pool->stopping) {
+        } while (back_to_sleep(pool, ret));
+        if (ret == -1 || pool->stopping ||
+            pool->cur_threads > pool->max_threads) {
             break;
         }
 
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+    qemu_mutex_lock(&pool->lock);
+
+    pool->min_threads = ctx->thread_pool_min;
+    pool->max_threads = ctx->thread_pool_max;
+
+    /*
+     * We either have to:
+     *  - Increase the number available of threads until over the min_threads
+     *    threshold.
+     *  - Decrease the number of available threads until under the max_threads
+     *    threshold.
+     *  - Do nothing. The current number of threads fall in between the min and
+     *    max thresholds. We'll let the pool manage itself.
+     */
+    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+        spawn_thread(pool);
+    }
+
+    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+        qemu_sem_post(&pool->sem);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+}
+
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    thread_pool_update_params(pool, ctx);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)