Browse Source

Merge remote-tracking branch 'remotes/armbru/tags/pull-monitor-2020-10-09' into staging

Monitor patches for 2020-10-09

# gpg: Signature made Fri 09 Oct 2020 06:16:51 BST
# gpg:                using RSA key 354BC8B3D7EB2A6B68674E5F3870B400EB918653
# gpg:                issuer "armbru@redhat.com"
# gpg: Good signature from "Markus Armbruster <armbru@redhat.com>" [full]
# gpg:                 aka "Markus Armbruster <armbru@pond.sub.org>" [full]
# Primary key fingerprint: 354B C8B3 D7EB 2A6B 6867  4E5F 3870 B400 EB91 8653

* remotes/armbru/tags/pull-monitor-2020-10-09:
  block: Convert 'block_resize' to coroutine
  block: Add bdrv_lock()/unlock()
  block: Add bdrv_co_enter()/leave()
  util/async: Add aio_co_reschedule_self()
  hmp: Add support for coroutine command handlers
  qmp: Move dispatcher to a coroutine
  qapi: Add a 'coroutine' flag for commands
  monitor: Make current monitor a per-coroutine property
  qmp: Call monitor_set_cur() only in qmp_dispatch()
  qmp: Assert that no other monitor is active
  hmp: Update current monitor only in handle_hmp_command()
  monitor: Use getter/setter functions for cur_mon
  monitor: Add Monitor parameter to monitor_get_cpu_index()
  monitor: Add Monitor parameter to monitor_set_cpu()

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Peter Maydell 4 years ago
parent
commit
b7092cda1b
50 changed files with 573 additions and 143 deletions
  1. 4 4
      audio/wavcapture.c
  2. 50 0
      block.c
  3. 8 8
      blockdev.c
  4. 29 0
      docs/devel/qapi-code-gen.txt
  5. 1 1
      docs/sphinx/qapidoc.py
  6. 1 1
      dump/dump.c
  7. 1 0
      hmp-commands.hx
  8. 1 1
      hw/core/machine-hmp-cmds.c
  9. 1 1
      hw/scsi/vhost-scsi.c
  10. 1 1
      hw/virtio/vhost-vsock.c
  11. 10 0
      include/block/aio.h
  12. 31 0
      include/block/block.h
  13. 4 3
      include/monitor/monitor.h
  14. 4 1
      include/qapi/qmp/dispatch.h
  15. 2 2
      migration/fd.c
  16. 2 2
      monitor/hmp-cmds.c
  17. 35 9
      monitor/hmp.c
  18. 17 21
      monitor/misc.c
  19. 5 2
      monitor/monitor-internal.h
  20. 91 10
      monitor/monitor.c
  21. 2 0
      monitor/qmp-cmds-control.c
  22. 1 1
      monitor/qmp-cmds.c
  23. 94 37
      monitor/qmp.c
  24. 1 1
      net/socket.c
  25. 3 3
      net/tap.c
  26. 2 1
      qapi/block-core.json
  27. 63 2
      qapi/qmp-dispatch.c
  28. 3 0
      qapi/qmp-registry.c
  29. 1 1
      qga/main.c
  30. 7 3
      scripts/qapi/commands.py
  31. 9 2
      scripts/qapi/expr.py
  32. 1 1
      scripts/qapi/introspect.py
  33. 9 4
      scripts/qapi/schema.py
  34. 1 1
      softmmu/cpus.c
  35. 9 1
      stubs/monitor-core.c
  36. 1 0
      tests/qapi-schema/meson.build
  37. 2 0
      tests/qapi-schema/oob-coroutine.err
  38. 2 0
      tests/qapi-schema/oob-coroutine.json
  39. 0 0
      tests/qapi-schema/oob-coroutine.out
  40. 1 0
      tests/qapi-schema/qapi-schema-test.json
  41. 2 0
      tests/qapi-schema/qapi-schema-test.out
  42. 4 3
      tests/qapi-schema/test-qapi.py
  43. 7 3
      tests/test-qmp-cmds.c
  44. 6 6
      tests/test-util-sockets.c
  45. 1 1
      trace/control.c
  46. 7 1
      util/aio-posix.c
  47. 30 0
      util/async.c
  48. 3 3
      util/qemu-error.c
  49. 2 1
      util/qemu-print.c
  50. 1 0
      util/qemu-sockets.c

+ 4 - 4
audio/wavcapture.c

@@ -1,5 +1,5 @@
 #include "qemu/osdep.h"
-#include "monitor/monitor.h"
+#include "qemu/qemu-print.h"
 #include "qapi/error.h"
 #include "qemu/error-report.h"
 #include "audio.h"
@@ -94,9 +94,9 @@ static void wav_capture_info (void *opaque)
     WAVState *wav = opaque;
     char *path = wav->path;
 
-    monitor_printf (cur_mon, "Capturing audio(%d,%d,%d) to %s: %d bytes\n",
-                    wav->freq, wav->bits, wav->nchannels,
-                    path ? path : "<not available>", wav->bytes);
+    qemu_printf("Capturing audio(%d,%d,%d) to %s: %d bytes\n",
+                wav->freq, wav->bits, wav->nchannels,
+                path ? path : "<not available>", wav->bytes);
 }
 
 static struct capture_ops wav_capture_ops = {

+ 50 - 0
block.c

@@ -6303,6 +6303,56 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs)
     return bs ? bs->aio_context : qemu_get_aio_context();
 }
 
+AioContext *coroutine_fn bdrv_co_enter(BlockDriverState *bs)
+{
+    Coroutine *self = qemu_coroutine_self();
+    AioContext *old_ctx = qemu_coroutine_get_aio_context(self);
+    AioContext *new_ctx;
+
+    /*
+     * Increase bs->in_flight to ensure that this operation is completed before
+     * moving the node to a different AioContext. Read new_ctx only afterwards.
+     */
+    bdrv_inc_in_flight(bs);
+
+    new_ctx = bdrv_get_aio_context(bs);
+    aio_co_reschedule_self(new_ctx);
+    return old_ctx;
+}
+
+void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx)
+{
+    aio_co_reschedule_self(old_ctx);
+    bdrv_dec_in_flight(bs);
+}
+
+void coroutine_fn bdrv_co_lock(BlockDriverState *bs)
+{
+    AioContext *ctx = bdrv_get_aio_context(bs);
+
+    /* In the main thread, bs->aio_context won't change concurrently */
+    assert(qemu_get_current_aio_context() == qemu_get_aio_context());
+
+    /*
+     * We're in coroutine context, so we already hold the lock of the main
+     * loop AioContext. Don't lock it twice to avoid deadlocks.
+     */
+    assert(qemu_in_coroutine());
+    if (ctx != qemu_get_aio_context()) {
+        aio_context_acquire(ctx);
+    }
+}
+
+void coroutine_fn bdrv_co_unlock(BlockDriverState *bs)
+{
+    AioContext *ctx = bdrv_get_aio_context(bs);
+
+    assert(qemu_in_coroutine());
+    if (ctx != qemu_get_aio_context()) {
+        aio_context_release(ctx);
+    }
+}
+
 void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co)
 {
     aio_co_enter(bdrv_get_aio_context(bs), co);

+ 8 - 8
blockdev.c

@@ -2449,14 +2449,14 @@ BlockDirtyBitmapSha256 *qmp_x_debug_block_dirty_bitmap_sha256(const char *node,
     return ret;
 }
 
-void qmp_block_resize(bool has_device, const char *device,
-                      bool has_node_name, const char *node_name,
-                      int64_t size, Error **errp)
+void coroutine_fn qmp_block_resize(bool has_device, const char *device,
+                                   bool has_node_name, const char *node_name,
+                                   int64_t size, Error **errp)
 {
     Error *local_err = NULL;
     BlockBackend *blk = NULL;
     BlockDriverState *bs;
-    AioContext *aio_context;
+    AioContext *old_ctx;
 
     bs = bdrv_lookup_bs(has_device ? device : NULL,
                         has_node_name ? node_name : NULL,
@@ -2466,9 +2466,6 @@ void qmp_block_resize(bool has_device, const char *device,
         return;
     }
 
-    aio_context = bdrv_get_aio_context(bs);
-    aio_context_acquire(aio_context);
-
     if (size < 0) {
         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "size", "a >0 size");
         goto out;
@@ -2485,12 +2482,15 @@ void qmp_block_resize(bool has_device, const char *device,
     }
 
     bdrv_drained_begin(bs);
+    old_ctx = bdrv_co_enter(bs);
     blk_truncate(blk, size, false, PREALLOC_MODE_OFF, 0, errp);
+    bdrv_co_leave(bs, old_ctx);
     bdrv_drained_end(bs);
 
 out:
+    bdrv_co_lock(bs);
     blk_unref(blk);
-    aio_context_release(aio_context);
+    bdrv_co_unlock(bs);
 }
 
 void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,

+ 29 - 0
docs/devel/qapi-code-gen.txt

@@ -472,6 +472,7 @@ Syntax:
                 '*gen': false,
                 '*allow-oob': true,
                 '*allow-preconfig': true,
+                '*coroutine': true,
                 '*if': COND,
                 '*features': FEATURES }
 
@@ -596,6 +597,34 @@ before the machine is built.  It defaults to false.  For example:
 QMP is available before the machine is built only when QEMU was
 started with --preconfig.
 
+Member 'coroutine' tells the QMP dispatcher whether the command handler
+is safe to be run in a coroutine.  It defaults to false.  If it is true,
+the command handler is called from coroutine context and may yield while
+waiting for an external event (such as I/O completion) in order to avoid
+blocking the guest and other background operations.
+
+Coroutine safety can be hard to prove, similar to thread safety.  Common
+pitfalls are:
+
+- The global mutex isn't held across qemu_coroutine_yield(), so
+  operations that used to assume that they execute atomically may have
+  to be more careful to protect against changes in the global state.
+
+- Nested event loops (AIO_WAIT_WHILE() etc.) are problematic in
+  coroutine context and can easily lead to deadlocks.  They should be
+  replaced by yielding and reentering the coroutine when the condition
+  becomes false.
+
+Since the command handler may assume coroutine context, any callers
+other than the QMP dispatcher must also call it in coroutine context.
+In particular, HMP commands calling such a QMP command handler must be
+marked .coroutine = true in hmp-commands.hx.
+
+It is an error to specify both 'coroutine': true and 'allow-oob': true
+for a command.  We don't currently have a use case for both together and
+without a use case, it's not entirely clear what the semantics should
+be.
+
 The optional 'if' member specifies a conditional.  See "Configuring
 the schema" below for more on this.
 

+ 1 - 1
docs/sphinx/qapidoc.py

@@ -330,7 +330,7 @@ def visit_alternate_type(self, name, info, ifcond, features, variants):
 
     def visit_command(self, name, info, ifcond, features, arg_type,
                       ret_type, gen, success_response, boxed, allow_oob,
-                      allow_preconfig):
+                      allow_preconfig, coroutine):
         doc = self._cur_doc
         self._add_doc('Command',
                       self._nodes_for_arguments(doc,

+ 1 - 1
dump/dump.c

@@ -1986,7 +1986,7 @@ void qmp_dump_guest_memory(bool paging, const char *file,
 
 #if !defined(WIN32)
     if (strstart(file, "fd:", &p)) {
-        fd = monitor_get_fd(cur_mon, p, errp);
+        fd = monitor_get_fd(monitor_cur(), p, errp);
         if (fd == -1) {
             return;
         }

+ 1 - 0
hmp-commands.hx

@@ -76,6 +76,7 @@ ERST
         .params     = "device size",
         .help       = "resize a block image",
         .cmd        = hmp_block_resize,
+        .coroutine  = true,
     },
 
 SRST

+ 1 - 1
hw/core/machine-hmp-cmds.c

@@ -34,7 +34,7 @@ void hmp_info_cpus(Monitor *mon, const QDict *qdict)
     for (cpu = cpu_list; cpu; cpu = cpu->next) {
         int active = ' ';
 
-        if (cpu->value->cpu_index == monitor_get_cpu_index()) {
+        if (cpu->value->cpu_index == monitor_get_cpu_index(mon)) {
             active = '*';
         }
 

+ 1 - 1
hw/scsi/vhost-scsi.c

@@ -177,7 +177,7 @@ static void vhost_scsi_realize(DeviceState *dev, Error **errp)
     }
 
     if (vs->conf.vhostfd) {
-        vhostfd = monitor_fd_param(cur_mon, vs->conf.vhostfd, errp);
+        vhostfd = monitor_fd_param(monitor_cur(), vs->conf.vhostfd, errp);
         if (vhostfd == -1) {
             error_prepend(errp, "vhost-scsi: unable to parse vhostfd: ");
             return;

+ 1 - 1
hw/virtio/vhost-vsock.c

@@ -143,7 +143,7 @@ static void vhost_vsock_device_realize(DeviceState *dev, Error **errp)
     }
 
     if (vsock->conf.vhostfd) {
-        vhostfd = monitor_fd_param(cur_mon, vsock->conf.vhostfd, errp);
+        vhostfd = monitor_fd_param(monitor_cur(), vsock->conf.vhostfd, errp);
         if (vhostfd == -1) {
             error_prepend(errp, "vhost-vsock: unable to parse vhostfd: ");
             return;

+ 10 - 0
include/block/aio.h

@@ -17,6 +17,7 @@
 #ifdef CONFIG_LINUX_IO_URING
 #include <liburing.h>
 #endif
+#include "qemu/coroutine.h"
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
@@ -654,6 +655,15 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
  */
 void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
 
+/**
+ * aio_co_reschedule_self:
+ * @new_ctx: the new context
+ *
+ * Move the currently running coroutine to new_ctx. If the coroutine is already
+ * running in new_ctx, do nothing.
+ */
+void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx);
+
 /**
  * aio_co_wake:
  * @co: the coroutine

+ 31 - 0
include/block/block.h

@@ -640,6 +640,37 @@ bool bdrv_debug_is_suspended(BlockDriverState *bs, const char *tag);
  */
 AioContext *bdrv_get_aio_context(BlockDriverState *bs);
 
+/**
+ * Move the current coroutine to the AioContext of @bs and return the old
+ * AioContext of the coroutine. Increase bs->in_flight so that draining @bs
+ * will wait for the operation to proceed until the corresponding
+ * bdrv_co_leave().
+ *
+ * Consequently, you can't call drain inside a bdrv_co_enter/leave() section as
+ * this will deadlock.
+ */
+AioContext *coroutine_fn bdrv_co_enter(BlockDriverState *bs);
+
+/**
+ * Ends a section started by bdrv_co_enter(). Move the current coroutine back
+ * to old_ctx and decrease bs->in_flight again.
+ */
+void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx);
+
+/**
+ * Locks the AioContext of @bs if it's not the current AioContext. This avoids
+ * double locking which could lead to deadlocks: This is a coroutine_fn, so we
+ * know we already own the lock of the current AioContext.
+ *
+ * May only be called in the main thread.
+ */
+void coroutine_fn bdrv_co_lock(BlockDriverState *bs);
+
+/**
+ * Unlocks the AioContext of @bs if it's not the current AioContext.
+ */
+void coroutine_fn bdrv_co_unlock(BlockDriverState *bs);
+
 /**
  * Transfer control to @co in the aio context of @bs
  */

+ 4 - 3
include/monitor/monitor.h

@@ -5,7 +5,6 @@
 #include "qapi/qapi-types-misc.h"
 #include "qemu/readline.h"
 
-extern __thread Monitor *cur_mon;
 typedef struct MonitorHMP MonitorHMP;
 typedef struct MonitorOptions MonitorOptions;
 
@@ -13,6 +12,8 @@ typedef struct MonitorOptions MonitorOptions;
 
 extern QemuOptsList qemu_mon_opts;
 
+Monitor *monitor_cur(void);
+Monitor *monitor_set_cur(Coroutine *co, Monitor *mon);
 bool monitor_cur_is_qmp(void);
 
 void monitor_init_globals(void);
@@ -33,8 +34,8 @@ int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap)
     GCC_FMT_ATTR(2, 0);
 int monitor_printf(Monitor *mon, const char *fmt, ...) GCC_FMT_ATTR(2, 3);
 void monitor_flush(Monitor *mon);
-int monitor_set_cpu(int cpu_index);
-int monitor_get_cpu_index(void);
+int monitor_set_cpu(Monitor *mon, int cpu_index);
+int monitor_get_cpu_index(Monitor *mon);
 
 void monitor_read_command(MonitorHMP *mon, int show_prompt);
 int monitor_read_password(MonitorHMP *mon, ReadLineFunc *readline_func,

+ 4 - 1
include/qapi/qmp/dispatch.h

@@ -14,6 +14,7 @@
 #ifndef QAPI_QMP_DISPATCH_H
 #define QAPI_QMP_DISPATCH_H
 
+#include "monitor/monitor.h"
 #include "qemu/queue.h"
 
 typedef void (QmpCommandFunc)(QDict *, QObject **, Error **);
@@ -24,11 +25,13 @@ typedef enum QmpCommandOptions
     QCO_NO_SUCCESS_RESP       =  (1U << 0),
     QCO_ALLOW_OOB             =  (1U << 1),
     QCO_ALLOW_PRECONFIG       =  (1U << 2),
+    QCO_COROUTINE             =  (1U << 3),
 } QmpCommandOptions;
 
 typedef struct QmpCommand
 {
     const char *name;
+    /* Runs in coroutine context if QCO_COROUTINE is set */
     QmpCommandFunc *fn;
     QmpCommandOptions options;
     QTAILQ_ENTRY(QmpCommand) node;
@@ -49,7 +52,7 @@ const char *qmp_command_name(const QmpCommand *cmd);
 bool qmp_has_success_response(const QmpCommand *cmd);
 QDict *qmp_error_response(Error *err);
 QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
-                    bool allow_oob);
+                    bool allow_oob, Monitor *cur_mon);
 bool qmp_is_oob(const QDict *dict);
 
 typedef void (*qmp_cmd_callback_fn)(const QmpCommand *cmd, void *opaque);

+ 2 - 2
migration/fd.c

@@ -26,7 +26,7 @@
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp)
 {
     QIOChannel *ioc;
-    int fd = monitor_get_fd(cur_mon, fdname, errp);
+    int fd = monitor_get_fd(monitor_cur(), fdname, errp);
     if (fd == -1) {
         return;
     }
@@ -55,7 +55,7 @@ static gboolean fd_accept_incoming_migration(QIOChannel *ioc,
 void fd_start_incoming_migration(const char *fdname, Error **errp)
 {
     QIOChannel *ioc;
-    int fd = monitor_fd_param(cur_mon, fdname, errp);
+    int fd = monitor_fd_param(monitor_cur(), fdname, errp);
     if (fd == -1) {
         return;
     }

+ 2 - 2
monitor/hmp-cmds.c

@@ -998,7 +998,7 @@ void hmp_cpu(Monitor *mon, const QDict *qdict)
     /* XXX: drop the monitor_set_cpu() usage when all HMP commands that
             use it are converted to the QAPI */
     cpu_index = qdict_get_int(qdict, "index");
-    if (monitor_set_cpu(cpu_index) < 0) {
+    if (monitor_set_cpu(mon, cpu_index) < 0) {
         monitor_printf(mon, "invalid CPU index\n");
     }
 }
@@ -1009,7 +1009,7 @@ void hmp_memsave(Monitor *mon, const QDict *qdict)
     const char *filename = qdict_get_str(qdict, "filename");
     uint64_t addr = qdict_get_int(qdict, "val");
     Error *err = NULL;
-    int cpu_index = monitor_get_cpu_index();
+    int cpu_index = monitor_get_cpu_index(mon);
 
     if (cpu_index < 0) {
         monitor_printf(mon, "No CPU available\n");

+ 35 - 9
monitor/hmp.c

@@ -1056,6 +1056,21 @@ fail:
     return NULL;
 }
 
+typedef struct HandleHmpCommandCo {
+    Monitor *mon;
+    const HMPCommand *cmd;
+    QDict *qdict;
+    bool done;
+} HandleHmpCommandCo;
+
+static void handle_hmp_command_co(void *opaque)
+{
+    HandleHmpCommandCo *data = opaque;
+    data->cmd->cmd(data->mon, data->qdict);
+    monitor_set_cur(qemu_coroutine_self(), NULL);
+    data->done = true;
+}
+
 void handle_hmp_command(MonitorHMP *mon, const char *cmdline)
 {
     QDict *qdict;
@@ -1079,7 +1094,24 @@ void handle_hmp_command(MonitorHMP *mon, const char *cmdline)
         return;
     }
 
-    cmd->cmd(&mon->common, qdict);
+    if (!cmd->coroutine) {
+        /* old_mon is non-NULL when called from qmp_human_monitor_command() */
+        Monitor *old_mon = monitor_set_cur(qemu_coroutine_self(), &mon->common);
+        cmd->cmd(&mon->common, qdict);
+        monitor_set_cur(qemu_coroutine_self(), old_mon);
+    } else {
+        HandleHmpCommandCo data = {
+            .mon = &mon->common,
+            .cmd = cmd,
+            .qdict = qdict,
+            .done = false,
+        };
+        Coroutine *co = qemu_coroutine_create(handle_hmp_command_co, &data);
+        monitor_set_cur(co, &mon->common);
+        aio_co_enter(qemu_get_aio_context(), co);
+        AIO_WAIT_WHILE(qemu_get_aio_context(), !data.done);
+    }
+
     qobject_unref(qdict);
 }
 
@@ -1300,26 +1332,20 @@ cleanup:
 
 static void monitor_read(void *opaque, const uint8_t *buf, int size)
 {
-    MonitorHMP *mon;
-    Monitor *old_mon = cur_mon;
+    MonitorHMP *mon = container_of(opaque, MonitorHMP, common);
     int i;
 
-    cur_mon = opaque;
-    mon = container_of(cur_mon, MonitorHMP, common);
-
     if (mon->rs) {
         for (i = 0; i < size; i++) {
             readline_handle_byte(mon->rs, buf[i]);
         }
     } else {
         if (size == 0 || buf[size - 1] != 0) {
-            monitor_printf(cur_mon, "corrupted command\n");
+            monitor_printf(&mon->common, "corrupted command\n");
         } else {
             handle_hmp_command(mon, (char *)buf);
         }
     }
-
-    cur_mon = old_mon;
 }
 
 static void monitor_event(void *opaque, QEMUChrEvent event)

+ 17 - 21
monitor/misc.c

@@ -120,18 +120,13 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
                                 int64_t cpu_index, Error **errp)
 {
     char *output = NULL;
-    Monitor *old_mon;
     MonitorHMP hmp = {};
 
     monitor_data_init(&hmp.common, false, true, false);
 
-    old_mon = cur_mon;
-    cur_mon = &hmp.common;
-
     if (has_cpu_index) {
-        int ret = monitor_set_cpu(cpu_index);
+        int ret = monitor_set_cpu(&hmp.common, cpu_index);
         if (ret < 0) {
-            cur_mon = old_mon;
             error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cpu-index",
                        "a CPU number");
             goto out;
@@ -139,7 +134,6 @@ char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
     }
 
     handle_hmp_command(&hmp, command_line);
-    cur_mon = old_mon;
 
     WITH_QEMU_LOCK_GUARD(&hmp.common.mon_lock) {
         if (qstring_get_length(hmp.common.outbuf) > 0) {
@@ -255,7 +249,7 @@ static void monitor_init_qmp_commands(void)
 }
 
 /* Set the current CPU defined by the user. Callers must hold BQL. */
-int monitor_set_cpu(int cpu_index)
+int monitor_set_cpu(Monitor *mon, int cpu_index)
 {
     CPUState *cpu;
 
@@ -263,29 +257,29 @@ int monitor_set_cpu(int cpu_index)
     if (cpu == NULL) {
         return -1;
     }
-    g_free(cur_mon->mon_cpu_path);
-    cur_mon->mon_cpu_path = object_get_canonical_path(OBJECT(cpu));
+    g_free(mon->mon_cpu_path);
+    mon->mon_cpu_path = object_get_canonical_path(OBJECT(cpu));
     return 0;
 }
 
 /* Callers must hold BQL. */
-static CPUState *mon_get_cpu_sync(bool synchronize)
+static CPUState *mon_get_cpu_sync(Monitor *mon, bool synchronize)
 {
     CPUState *cpu = NULL;
 
-    if (cur_mon->mon_cpu_path) {
-        cpu = (CPUState *) object_resolve_path_type(cur_mon->mon_cpu_path,
+    if (mon->mon_cpu_path) {
+        cpu = (CPUState *) object_resolve_path_type(mon->mon_cpu_path,
                                                     TYPE_CPU, NULL);
         if (!cpu) {
-            g_free(cur_mon->mon_cpu_path);
-            cur_mon->mon_cpu_path = NULL;
+            g_free(mon->mon_cpu_path);
+            mon->mon_cpu_path = NULL;
         }
     }
-    if (!cur_mon->mon_cpu_path) {
+    if (!mon->mon_cpu_path) {
         if (!first_cpu) {
             return NULL;
         }
-        monitor_set_cpu(first_cpu->cpu_index);
+        monitor_set_cpu(mon, first_cpu->cpu_index);
         cpu = first_cpu;
     }
     assert(cpu != NULL);
@@ -297,7 +291,7 @@ static CPUState *mon_get_cpu_sync(bool synchronize)
 
 CPUState *mon_get_cpu(void)
 {
-    return mon_get_cpu_sync(true);
+    return mon_get_cpu_sync(monitor_cur(), true);
 }
 
 CPUArchState *mon_get_cpu_env(void)
@@ -307,9 +301,9 @@ CPUArchState *mon_get_cpu_env(void)
     return cs ? cs->env_ptr : NULL;
 }
 
-int monitor_get_cpu_index(void)
+int monitor_get_cpu_index(Monitor *mon)
 {
-    CPUState *cs = mon_get_cpu_sync(false);
+    CPUState *cs = mon_get_cpu_sync(mon, false);
 
     return cs ? cs->cpu_index : UNASSIGNED_CPU_INDEX;
 }
@@ -1232,6 +1226,7 @@ static void hmp_acl_remove(Monitor *mon, const QDict *qdict)
 
 void qmp_getfd(const char *fdname, Error **errp)
 {
+    Monitor *cur_mon = monitor_cur();
     mon_fd_t *monfd;
     int fd, tmp_fd;
 
@@ -1270,6 +1265,7 @@ void qmp_getfd(const char *fdname, Error **errp)
 
 void qmp_closefd(const char *fdname, Error **errp)
 {
+    Monitor *cur_mon = monitor_cur();
     mon_fd_t *monfd;
     int tmp_fd;
 
@@ -1356,7 +1352,7 @@ AddfdInfo *qmp_add_fd(bool has_fdset_id, int64_t fdset_id, bool has_opaque,
                       const char *opaque, Error **errp)
 {
     int fd;
-    Monitor *mon = cur_mon;
+    Monitor *mon = monitor_cur();
     AddfdInfo *fdinfo;
 
     fd = qemu_chr_fe_get_msgfd(&mon->chr);

+ 5 - 2
monitor/monitor-internal.h

@@ -74,6 +74,7 @@ typedef struct HMPCommand {
     const char *help;
     const char *flags; /* p=preconfig */
     void (*cmd)(Monitor *mon, const QDict *qdict);
+    bool coroutine;
     /*
      * @sub_table is a list of 2nd level of commands. If it does not exist,
      * cmd should be used. If it exists, sub_table[?].cmd should be
@@ -155,7 +156,9 @@ static inline bool monitor_is_qmp(const Monitor *mon)
 
 typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
 extern IOThread *mon_iothread;
-extern QEMUBH *qmp_dispatcher_bh;
+extern Coroutine *qmp_dispatcher_co;
+extern bool qmp_dispatcher_co_shutdown;
+extern bool qmp_dispatcher_co_busy;
 extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
 extern QemuMutex monitor_lock;
 extern MonitorList mon_list;
@@ -173,7 +176,7 @@ void monitor_fdsets_cleanup(void);
 
 void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
 void monitor_data_destroy_qmp(MonitorQMP *mon);
-void monitor_qmp_bh_dispatcher(void *data);
+void coroutine_fn monitor_qmp_dispatcher_co(void *data);
 
 int get_monitor_def(int64_t *pval, const char *name);
 void help_cmd(Monitor *mon, const char *name);

+ 91 - 10
monitor/monitor.c

@@ -55,24 +55,85 @@ typedef struct {
 /* Shared monitor I/O thread */
 IOThread *mon_iothread;
 
-/* Bottom half to dispatch the requests received from I/O thread */
-QEMUBH *qmp_dispatcher_bh;
+/* Coroutine to dispatch the requests received from I/O thread */
+Coroutine *qmp_dispatcher_co;
 
-/* Protects mon_list, monitor_qapi_event_state, monitor_destroyed.  */
+/* Set to true when the dispatcher coroutine should terminate */
+bool qmp_dispatcher_co_shutdown;
+
+/*
+ * qmp_dispatcher_co_busy is used for synchronisation between the
+ * monitor thread and the main thread to ensure that the dispatcher
+ * coroutine never gets scheduled a second time when it's already
+ * scheduled (scheduling the same coroutine twice is forbidden).
+ *
+ * It is true if the coroutine is active and processing requests.
+ * Additional requests may then be pushed onto mon->qmp_requests,
+ * and @qmp_dispatcher_co_shutdown may be set without further ado.
+ * @qmp_dispatcher_co_busy must not be woken up in this case.
+ *
+ * If false, you also have to set @qmp_dispatcher_co_busy to true and
+ * wake up @qmp_dispatcher_co after pushing the new requests.
+ *
+ * The coroutine will automatically change this variable back to false
+ * before it yields.  Nobody else may set the variable to false.
+ *
+ * Access must be atomic for thread safety.
+ */
+bool qmp_dispatcher_co_busy;
+
+/*
+ * Protects mon_list, monitor_qapi_event_state, coroutine_mon,
+ * monitor_destroyed.
+ */
 QemuMutex monitor_lock;
 static GHashTable *monitor_qapi_event_state;
+static GHashTable *coroutine_mon; /* Maps Coroutine* to Monitor* */
 
 MonitorList mon_list;
 int mon_refcount;
 static bool monitor_destroyed;
 
-__thread Monitor *cur_mon;
+Monitor *monitor_cur(void)
+{
+    Monitor *mon;
+
+    qemu_mutex_lock(&monitor_lock);
+    mon = g_hash_table_lookup(coroutine_mon, qemu_coroutine_self());
+    qemu_mutex_unlock(&monitor_lock);
+
+    return mon;
+}
+
+/**
+ * Sets a new current monitor and returns the old one.
+ *
+ * If a non-NULL monitor is set for a coroutine, another call
+ * resetting it to NULL is required before the coroutine terminates,
+ * otherwise a stale entry would remain in the hash table.
+ */
+Monitor *monitor_set_cur(Coroutine *co, Monitor *mon)
+{
+    Monitor *old_monitor = monitor_cur();
+
+    qemu_mutex_lock(&monitor_lock);
+    if (mon) {
+        g_hash_table_replace(coroutine_mon, co, mon);
+    } else {
+        g_hash_table_remove(coroutine_mon, co);
+    }
+    qemu_mutex_unlock(&monitor_lock);
+
+    return old_monitor;
+}
 
 /**
  * Is the current monitor, if any, a QMP monitor?
  */
 bool monitor_cur_is_qmp(void)
 {
+    Monitor *cur_mon = monitor_cur();
+
     return cur_mon && monitor_is_qmp(cur_mon);
 }
 
@@ -209,6 +270,8 @@ int monitor_printf(Monitor *mon, const char *fmt, ...)
  */
 int error_vprintf(const char *fmt, va_list ap)
 {
+    Monitor *cur_mon = monitor_cur();
+
     if (cur_mon && !monitor_cur_is_qmp()) {
         return monitor_vprintf(cur_mon, fmt, ap);
     }
@@ -217,6 +280,8 @@ int error_vprintf(const char *fmt, va_list ap)
 
 int error_vprintf_unless_qmp(const char *fmt, va_list ap)
 {
+    Monitor *cur_mon = monitor_cur();
+
     if (!cur_mon) {
         return vfprintf(stderr, fmt, ap);
     }
@@ -582,9 +647,24 @@ void monitor_cleanup(void)
     }
     qemu_mutex_unlock(&monitor_lock);
 
-    /* QEMUBHs needs to be deleted before destroying the I/O thread */
-    qemu_bh_delete(qmp_dispatcher_bh);
-    qmp_dispatcher_bh = NULL;
+    /*
+     * The dispatcher needs to stop before destroying the I/O thread.
+     *
+     * We need to poll both qemu_aio_context and iohandler_ctx to make
+     * sure that the dispatcher coroutine keeps making progress and
+     * eventually terminates.  qemu_aio_context is automatically
+     * polled by calling AIO_WAIT_WHILE on it, but we must poll
+     * iohandler_ctx manually.
+     */
+    qmp_dispatcher_co_shutdown = true;
+    if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
+        aio_co_wake(qmp_dispatcher_co);
+    }
+
+    AIO_WAIT_WHILE(qemu_get_aio_context(),
+                   (aio_poll(iohandler_get_aio_context(), false),
+                    qatomic_mb_read(&qmp_dispatcher_co_busy)));
+
     if (mon_iothread) {
         iothread_destroy(mon_iothread);
         mon_iothread = NULL;
@@ -601,15 +681,16 @@ void monitor_init_globals_core(void)
 {
     monitor_qapi_event_init();
     qemu_mutex_init(&monitor_lock);
+    coroutine_mon = g_hash_table_new(NULL, NULL);
 
     /*
      * The dispatcher BH must run in the main loop thread, since we
      * have commands assuming that context.  It would be nice to get
      * rid of those assumptions.
      */
-    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
-                                   monitor_qmp_bh_dispatcher,
-                                   NULL);
+    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
+    qatomic_mb_set(&qmp_dispatcher_co_busy, true);
+    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
 }
 
 int monitor_init(MonitorOptions *opts, bool allow_hmp, Error **errp)

+ 2 - 0
monitor/qmp-cmds-control.c

@@ -69,6 +69,7 @@ static bool qmp_caps_accept(MonitorQMP *mon, QMPCapabilityList *list,
 void qmp_qmp_capabilities(bool has_enable, QMPCapabilityList *enable,
                           Error **errp)
 {
+    Monitor *cur_mon = monitor_cur();
     MonitorQMP *mon;
 
     assert(monitor_is_qmp(cur_mon));
@@ -119,6 +120,7 @@ static void query_commands_cb(const QmpCommand *cmd, void *opaque)
 CommandInfoList *qmp_query_commands(Error **errp)
 {
     CommandInfoList *list = NULL;
+    Monitor *cur_mon = monitor_cur();
     MonitorQMP *mon;
 
     assert(monitor_is_qmp(cur_mon));

+ 1 - 1
monitor/qmp-cmds.c

@@ -328,7 +328,7 @@ void qmp_add_client(const char *protocol, const char *fdname,
     Chardev *s;
     int fd;
 
-    fd = monitor_get_fd(cur_mon, fdname, errp);
+    fd = monitor_get_fd(monitor_cur(), fdname, errp);
     if (fd < 0) {
         return;
     }

+ 94 - 37
monitor/qmp.c

@@ -133,18 +133,17 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
     }
 }
 
+/*
+ * Runs outside of coroutine context for OOB commands, but in
+ * coroutine context for everything else.
+ */
 static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
 {
-    Monitor *old_mon;
     QDict *rsp;
     QDict *error;
 
-    old_mon = cur_mon;
-    cur_mon = &mon->common;
-
-    rsp = qmp_dispatch(mon->commands, req, qmp_oob_enabled(mon));
-
-    cur_mon = old_mon;
+    rsp = qmp_dispatch(mon->commands, req, qmp_oob_enabled(mon),
+                       &mon->common);
 
     if (mon->commands == &qmp_cap_negotiation_commands) {
         error = qdict_get_qdict(rsp, "error");
@@ -211,43 +210,99 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
     return req_obj;
 }
 
-void monitor_qmp_bh_dispatcher(void *data)
+void coroutine_fn monitor_qmp_dispatcher_co(void *data)
 {
-    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
+    QMPRequest *req_obj = NULL;
     QDict *rsp;
     bool need_resume;
     MonitorQMP *mon;
 
-    if (!req_obj) {
-        return;
-    }
+    while (true) {
+        assert(qatomic_mb_read(&qmp_dispatcher_co_busy) == true);
 
-    mon = req_obj->mon;
-    /*  qmp_oob_enabled() might change after "qmp_capabilities" */
-    need_resume = !qmp_oob_enabled(mon) ||
-        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
-    qemu_mutex_unlock(&mon->qmp_queue_lock);
-    if (req_obj->req) {
-        QDict *qdict = qobject_to(QDict, req_obj->req);
-        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
-        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
-        monitor_qmp_dispatch(mon, req_obj->req);
-    } else {
-        assert(req_obj->err);
-        rsp = qmp_error_response(req_obj->err);
-        req_obj->err = NULL;
-        monitor_qmp_respond(mon, rsp);
-        qobject_unref(rsp);
-    }
+        /*
+         * Mark the dispatcher as not busy already here so that we
+         * don't miss any new requests coming in the middle of our
+         * processing.
+         */
+        qatomic_mb_set(&qmp_dispatcher_co_busy, false);
+
+        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
+            /*
+             * No more requests to process.  Wait to be reentered from
+             * handle_qmp_command() when it pushes more requests, or
+             * from monitor_cleanup() when it requests shutdown.
+             */
+            if (!qmp_dispatcher_co_shutdown) {
+                qemu_coroutine_yield();
+
+                /*
+                 * busy must be set to true again by whoever
+                 * rescheduled us to avoid double scheduling
+                 */
+                assert(qatomic_xchg(&qmp_dispatcher_co_busy, false) == true);
+            }
+
+            /*
+             * qmp_dispatcher_co_shutdown may have changed if we
+             * yielded and were reentered from monitor_cleanup()
+             */
+            if (qmp_dispatcher_co_shutdown) {
+                return;
+            }
+        }
 
-    if (need_resume) {
-        /* Pairs with the monitor_suspend() in handle_qmp_command() */
-        monitor_resume(&mon->common);
-    }
-    qmp_request_free(req_obj);
+        if (qatomic_xchg(&qmp_dispatcher_co_busy, true) == true) {
+            /*
+             * Someone rescheduled us (probably because a new requests
+             * came in), but we didn't actually yield. Do that now,
+             * only to be immediately reentered and removed from the
+             * list of scheduled coroutines.
+             */
+            qemu_coroutine_yield();
+        }
+
+        /*
+         * Move the coroutine from iohandler_ctx to qemu_aio_context for
+         * executing the command handler so that it can make progress if it
+         * involves an AIO_WAIT_WHILE().
+         */
+        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+
+        mon = req_obj->mon;
+        /* qmp_oob_enabled() might change after "qmp_capabilities" */
+        need_resume = !qmp_oob_enabled(mon) ||
+            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
+        qemu_mutex_unlock(&mon->qmp_queue_lock);
+        if (req_obj->req) {
+            QDict *qdict = qobject_to(QDict, req_obj->req);
+            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
+            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
+            monitor_qmp_dispatch(mon, req_obj->req);
+        } else {
+            assert(req_obj->err);
+            rsp = qmp_error_response(req_obj->err);
+            req_obj->err = NULL;
+            monitor_qmp_respond(mon, rsp);
+            qobject_unref(rsp);
+        }
 
-    /* Reschedule instead of looping so the main loop stays responsive */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+        if (need_resume) {
+            /* Pairs with the monitor_suspend() in handle_qmp_command() */
+            monitor_resume(&mon->common);
+        }
+        qmp_request_free(req_obj);
+
+        /*
+         * Yield and reschedule so the main loop stays responsive.
+         *
+         * Move back to iohandler_ctx so that nested event loops for
+         * qemu_aio_context don't start new monitor commands.
+         */
+        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+    }
 }
 
 static void handle_qmp_command(void *opaque, QObject *req, Error *err)
@@ -308,7 +363,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
     qemu_mutex_unlock(&mon->qmp_queue_lock);
 
     /* Kick the dispatcher routine */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+    if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
+        aio_co_wake(qmp_dispatcher_co);
+    }
 }
 
 static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)

+ 1 - 1
net/socket.c

@@ -727,7 +727,7 @@ int net_init_socket(const Netdev *netdev, const char *name,
     if (sock->has_fd) {
         int fd, ret;
 
-        fd = monitor_fd_param(cur_mon, sock->fd, errp);
+        fd = monitor_fd_param(monitor_cur(), sock->fd, errp);
         if (fd == -1) {
             return -1;
         }

+ 3 - 3
net/tap.c

@@ -700,7 +700,7 @@ static void net_init_tap_one(const NetdevTapOptions *tap, NetClientState *peer,
         if (vhostfdname) {
             int ret;
 
-            vhostfd = monitor_fd_param(cur_mon, vhostfdname, &err);
+            vhostfd = monitor_fd_param(monitor_cur(), vhostfdname, &err);
             if (vhostfd == -1) {
                 if (tap->has_vhostforce && tap->vhostforce) {
                     error_propagate(errp, err);
@@ -808,7 +808,7 @@ int net_init_tap(const Netdev *netdev, const char *name,
             return -1;
         }
 
-        fd = monitor_fd_param(cur_mon, tap->fd, errp);
+        fd = monitor_fd_param(monitor_cur(), tap->fd, errp);
         if (fd == -1) {
             return -1;
         }
@@ -862,7 +862,7 @@ int net_init_tap(const Netdev *netdev, const char *name,
         }
 
         for (i = 0; i < nfds; i++) {
-            fd = monitor_fd_param(cur_mon, fds[i], errp);
+            fd = monitor_fd_param(monitor_cur(), fds[i], errp);
             if (fd == -1) {
                 ret = -1;
                 goto free_fail;

+ 2 - 1
qapi/block-core.json

@@ -1310,7 +1310,8 @@
 { 'command': 'block_resize',
   'data': { '*device': 'str',
             '*node-name': 'str',
-            'size': 'int' } }
+            'size': 'int' },
+  'coroutine': true }
 
 ##
 # @NewImageMode:

+ 63 - 2
qapi/qmp-dispatch.c

@@ -12,12 +12,16 @@
  */
 
 #include "qemu/osdep.h"
+
+#include "block/aio.h"
 #include "qapi/error.h"
 #include "qapi/qmp/dispatch.h"
 #include "qapi/qmp/qdict.h"
 #include "qapi/qmp/qjson.h"
 #include "sysemu/runstate.h"
 #include "qapi/qmp/qbool.h"
+#include "qemu/coroutine.h"
+#include "qemu/main-loop.h"
 
 static QDict *qmp_dispatch_check_obj(QDict *dict, bool allow_oob,
                                      Error **errp)
@@ -88,8 +92,32 @@ bool qmp_is_oob(const QDict *dict)
         && !qdict_haskey(dict, "execute");
 }
 
+typedef struct QmpDispatchBH {
+    const QmpCommand *cmd;
+    Monitor *cur_mon;
+    QDict *args;
+    QObject **ret;
+    Error **errp;
+    Coroutine *co;
+} QmpDispatchBH;
+
+static void do_qmp_dispatch_bh(void *opaque)
+{
+    QmpDispatchBH *data = opaque;
+
+    assert(monitor_cur() == NULL);
+    monitor_set_cur(qemu_coroutine_self(), data->cur_mon);
+    data->cmd->fn(data->args, data->ret, data->errp);
+    monitor_set_cur(qemu_coroutine_self(), NULL);
+    aio_co_wake(data->co);
+}
+
+/*
+ * Runs outside of coroutine context for OOB commands, but in coroutine
+ * context for everything else.
+ */
 QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
-                    bool allow_oob)
+                    bool allow_oob, Monitor *cur_mon)
 {
     Error *err = NULL;
     bool oob;
@@ -152,7 +180,40 @@ QDict *qmp_dispatch(const QmpCommandList *cmds, QObject *request,
         args = qdict_get_qdict(dict, "arguments");
         qobject_ref(args);
     }
-    cmd->fn(args, &ret, &err);
+
+    assert(!(oob && qemu_in_coroutine()));
+    assert(monitor_cur() == NULL);
+    if (!!(cmd->options & QCO_COROUTINE) == qemu_in_coroutine()) {
+        monitor_set_cur(qemu_coroutine_self(), cur_mon);
+        cmd->fn(args, &ret, &err);
+        monitor_set_cur(qemu_coroutine_self(), NULL);
+    } else {
+       /*
+        * Actual context doesn't match the one the command needs.
+        *
+        * Case 1: we are in coroutine context, but command does not
+        * have QCO_COROUTINE.  We need to drop out of coroutine
+        * context for executing it.
+        *
+        * Case 2: we are outside coroutine context, but command has
+        * QCO_COROUTINE.  Can't actually happen, because we get here
+        * outside coroutine context only when executing a command
+        * out of band, and OOB commands never have QCO_COROUTINE.
+        */
+        assert(!oob && qemu_in_coroutine() && !(cmd->options & QCO_COROUTINE));
+
+        QmpDispatchBH data = {
+            .cur_mon    = cur_mon,
+            .cmd        = cmd,
+            .args       = args,
+            .ret        = &ret,
+            .errp       = &err,
+            .co         = qemu_coroutine_self(),
+        };
+        aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
+                                &data);
+        qemu_coroutine_yield();
+    }
     qobject_unref(args);
     if (err) {
         /* or assert(!ret) after reviewing all handlers: */

+ 3 - 0
qapi/qmp-registry.c

@@ -20,6 +20,9 @@ void qmp_register_command(QmpCommandList *cmds, const char *name,
 {
     QmpCommand *cmd = g_malloc0(sizeof(*cmd));
 
+    /* QCO_COROUTINE and QCO_ALLOW_OOB are incompatible for now */
+    assert(!((options & QCO_COROUTINE) && (options & QCO_ALLOW_OOB)));
+
     cmd->name = name;
     cmd->fn = fn;
     cmd->enabled = true;

+ 1 - 1
qga/main.c

@@ -579,7 +579,7 @@ static void process_event(void *opaque, QObject *obj, Error *err)
     }
 
     g_debug("processing command");
-    rsp = qmp_dispatch(&ga_commands, obj, false);
+    rsp = qmp_dispatch(&ga_commands, obj, false, NULL);
 
 end:
     ret = send_response(s, rsp);

+ 7 - 3
scripts/qapi/commands.py

@@ -176,7 +176,8 @@ def gen_marshal(name, arg_type, boxed, ret_type):
     return ret
 
 
-def gen_register_command(name, success_response, allow_oob, allow_preconfig):
+def gen_register_command(name, success_response, allow_oob, allow_preconfig,
+                         coroutine):
     options = []
 
     if not success_response:
@@ -185,6 +186,8 @@ def gen_register_command(name, success_response, allow_oob, allow_preconfig):
         options += ['QCO_ALLOW_OOB']
     if allow_preconfig:
         options += ['QCO_ALLOW_PRECONFIG']
+    if coroutine:
+        options += ['QCO_COROUTINE']
 
     if not options:
         options = ['QCO_NO_OPTIONS']
@@ -267,7 +270,7 @@ def visit_end(self):
 
     def visit_command(self, name, info, ifcond, features,
                       arg_type, ret_type, gen, success_response, boxed,
-                      allow_oob, allow_preconfig):
+                      allow_oob, allow_preconfig, coroutine):
         if not gen:
             return
         # FIXME: If T is a user-defined type, the user is responsible
@@ -285,7 +288,8 @@ def visit_command(self, name, info, ifcond, features,
             self._genh.add(gen_marshal_decl(name))
             self._genc.add(gen_marshal(name, arg_type, boxed, ret_type))
             self._regy.add(gen_register_command(name, success_response,
-                                                allow_oob, allow_preconfig))
+                                                allow_oob, allow_preconfig,
+                                                coroutine))
 
 
 def gen_commands(schema, output_dir, prefix):

+ 9 - 2
scripts/qapi/expr.py

@@ -88,10 +88,17 @@ def check_flags(expr, info):
         if key in expr and expr[key] is not False:
             raise QAPISemError(
                 info, "flag '%s' may only use false value" % key)
-    for key in ['boxed', 'allow-oob', 'allow-preconfig']:
+    for key in ['boxed', 'allow-oob', 'allow-preconfig', 'coroutine']:
         if key in expr and expr[key] is not True:
             raise QAPISemError(
                 info, "flag '%s' may only use true value" % key)
+    if 'allow-oob' in expr and 'coroutine' in expr:
+        # This is not necessarily a fundamental incompatibility, but
+        # we don't have a use case and the desired semantics isn't
+        # obvious.  The simplest solution is to forbid it until we get
+        # a use case for it.
+        raise QAPISemError(info, "flags 'allow-oob' and 'coroutine' "
+                                 "are incompatible")
 
 
 def check_if(expr, info, source):
@@ -342,7 +349,7 @@ def check_exprs(exprs):
                        ['command'],
                        ['data', 'returns', 'boxed', 'if', 'features',
                         'gen', 'success-response', 'allow-oob',
-                        'allow-preconfig'])
+                        'allow-preconfig', 'coroutine'])
             normalize_members(expr.get('data'))
             check_command(expr, info)
         elif meta == 'event':

+ 1 - 1
scripts/qapi/introspect.py

@@ -216,7 +216,7 @@ def visit_alternate_type(self, name, info, ifcond, features, variants):
 
     def visit_command(self, name, info, ifcond, features,
                       arg_type, ret_type, gen, success_response, boxed,
-                      allow_oob, allow_preconfig):
+                      allow_oob, allow_preconfig, coroutine):
         arg_type = arg_type or self._schema.the_empty_object_type
         ret_type = ret_type or self._schema.the_empty_object_type
         obj = {'arg-type': self._use_type(arg_type),

+ 9 - 4
scripts/qapi/schema.py

@@ -128,7 +128,7 @@ def visit_alternate_type(self, name, info, ifcond, features, variants):
 
     def visit_command(self, name, info, ifcond, features,
                       arg_type, ret_type, gen, success_response, boxed,
-                      allow_oob, allow_preconfig):
+                      allow_oob, allow_preconfig, coroutine):
         pass
 
     def visit_event(self, name, info, ifcond, features, arg_type, boxed):
@@ -713,7 +713,8 @@ class QAPISchemaCommand(QAPISchemaEntity):
 
     def __init__(self, name, info, doc, ifcond, features,
                  arg_type, ret_type,
-                 gen, success_response, boxed, allow_oob, allow_preconfig):
+                 gen, success_response, boxed, allow_oob, allow_preconfig,
+                 coroutine):
         super().__init__(name, info, doc, ifcond, features)
         assert not arg_type or isinstance(arg_type, str)
         assert not ret_type or isinstance(ret_type, str)
@@ -726,6 +727,7 @@ def __init__(self, name, info, doc, ifcond, features,
         self.boxed = boxed
         self.allow_oob = allow_oob
         self.allow_preconfig = allow_preconfig
+        self.coroutine = coroutine
 
     def check(self, schema):
         super().check(schema)
@@ -768,7 +770,8 @@ def visit(self, visitor):
         visitor.visit_command(
             self.name, self.info, self.ifcond, self.features,
             self.arg_type, self.ret_type, self.gen, self.success_response,
-            self.boxed, self.allow_oob, self.allow_preconfig)
+            self.boxed, self.allow_oob, self.allow_preconfig,
+            self.coroutine)
 
 
 class QAPISchemaEvent(QAPISchemaEntity):
@@ -1074,6 +1077,7 @@ def _def_command(self, expr, info, doc):
         boxed = expr.get('boxed', False)
         allow_oob = expr.get('allow-oob', False)
         allow_preconfig = expr.get('allow-preconfig', False)
+        coroutine = expr.get('coroutine', False)
         ifcond = expr.get('if')
         features = self._make_features(expr.get('features'), info)
         if isinstance(data, OrderedDict):
@@ -1086,7 +1090,8 @@ def _def_command(self, expr, info, doc):
         self._def_entity(QAPISchemaCommand(name, info, doc, ifcond, features,
                                            data, rets,
                                            gen, success_response,
-                                           boxed, allow_oob, allow_preconfig))
+                                           boxed, allow_oob, allow_preconfig,
+                                           coroutine))
 
     def _def_event(self, expr, info, doc):
         name = expr['event']

+ 1 - 1
softmmu/cpus.c

@@ -793,6 +793,6 @@ exit:
 
 void qmp_inject_nmi(Error **errp)
 {
-    nmi_monitor_handle(monitor_get_cpu_index(), errp);
+    nmi_monitor_handle(monitor_get_cpu_index(monitor_cur()), errp);
 }
 

+ 9 - 1
stubs/monitor-core.c

@@ -3,7 +3,15 @@
 #include "qemu-common.h"
 #include "qapi/qapi-emit-events.h"
 
-__thread Monitor *cur_mon;
+Monitor *monitor_cur(void)
+{
+    return NULL;
+}
+
+Monitor *monitor_set_cur(Coroutine *co, Monitor *mon)
+{
+    return NULL;
+}
 
 void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp)
 {

+ 1 - 0
tests/qapi-schema/meson.build

@@ -142,6 +142,7 @@ schemas = [
   'nested-struct-data.json',
   'nested-struct-data-invalid-dict.json',
   'non-objects.json',
+  'oob-coroutine.json',
   'oob-test.json',
   'allow-preconfig-test.json',
   'pragma-doc-required-crap.json',

+ 2 - 0
tests/qapi-schema/oob-coroutine.err

@@ -0,0 +1,2 @@
+oob-coroutine.json: In command 'oob-command-1':
+oob-coroutine.json:2: flags 'allow-oob' and 'coroutine' are incompatible

+ 2 - 0
tests/qapi-schema/oob-coroutine.json

@@ -0,0 +1,2 @@
+# Check that incompatible flags allow-oob and coroutine are rejected
+{ 'command': 'oob-command-1', 'allow-oob': true, 'coroutine': true }

+ 0 - 0
tests/qapi-schema/oob-coroutine.out


+ 1 - 0
tests/qapi-schema/qapi-schema-test.json

@@ -148,6 +148,7 @@
   'returns': 'UserDefTwo' }
 
 { 'command': 'cmd-success-response', 'data': {}, 'success-response': false }
+{ 'command': 'coroutine-cmd', 'data': {}, 'coroutine': true }
 
 # Returning a non-dictionary requires a name from the whitelist
 { 'command': 'guest-get-time', 'data': {'a': 'int', '*b': 'int' },

+ 2 - 0
tests/qapi-schema/qapi-schema-test.out

@@ -203,6 +203,8 @@ command user_def_cmd2 q_obj_user_def_cmd2-arg -> UserDefTwo
     gen=True success_response=True boxed=False oob=False preconfig=False
 command cmd-success-response None -> None
     gen=True success_response=False boxed=False oob=False preconfig=False
+command coroutine-cmd None -> None
+    gen=True success_response=True boxed=False oob=False preconfig=False coroutine=True
 object q_obj_guest-get-time-arg
     member a: int optional=False
     member b: int optional=True

+ 4 - 3
tests/qapi-schema/test-qapi.py

@@ -68,12 +68,13 @@ def visit_alternate_type(self, name, info, ifcond, features, variants):
 
     def visit_command(self, name, info, ifcond, features,
                       arg_type, ret_type, gen, success_response, boxed,
-                      allow_oob, allow_preconfig):
+                      allow_oob, allow_preconfig, coroutine):
         print('command %s %s -> %s'
               % (name, arg_type and arg_type.name,
                  ret_type and ret_type.name))
-        print('    gen=%s success_response=%s boxed=%s oob=%s preconfig=%s'
-              % (gen, success_response, boxed, allow_oob, allow_preconfig))
+        print('    gen=%s success_response=%s boxed=%s oob=%s preconfig=%s%s'
+              % (gen, success_response, boxed, allow_oob, allow_preconfig,
+                 " coroutine=True" if coroutine else ""))
         self._print_if(ifcond)
         self._print_features(features)
 

+ 7 - 3
tests/test-qmp-cmds.c

@@ -36,6 +36,10 @@ void qmp_cmd_success_response(Error **errp)
 {
 }
 
+void qmp_coroutine_cmd(Error **errp)
+{
+}
+
 Empty2 *qmp_user_def_cmd0(Error **errp)
 {
     return g_new0(Empty2, 1);
@@ -152,7 +156,7 @@ static QObject *do_qmp_dispatch(bool allow_oob, const char *template, ...)
     req = qdict_from_vjsonf_nofail(template, ap);
     va_end(ap);
 
-    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob);
+    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob, NULL);
     g_assert(resp);
     ret = qdict_get(resp, "return");
     g_assert(ret);
@@ -175,7 +179,7 @@ static void do_qmp_dispatch_error(bool allow_oob, ErrorClass cls,
     req = qdict_from_vjsonf_nofail(template, ap);
     va_end(ap);
 
-    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob);
+    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), allow_oob, NULL);
     g_assert(resp);
     error = qdict_get_qdict(resp, "error");
     g_assert(error);
@@ -231,7 +235,7 @@ static void test_dispatch_cmd_success_response(void)
     QDict *resp;
 
     qdict_put_str(req, "execute", "cmd-success-response");
-    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), false);
+    resp = qmp_dispatch(&qmp_commands, QOBJECT(req), false, NULL);
     g_assert_null(resp);
     qobject_unref(req);
 }

+ 6 - 6
tests/test-util-sockets.c

@@ -52,6 +52,7 @@ static void test_fd_is_socket_good(void)
 
 static int mon_fd = -1;
 static const char *mon_fdname;
+__thread Monitor *cur_mon;
 
 int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp)
 {
@@ -65,15 +66,14 @@ int monitor_get_fd(Monitor *mon, const char *fdname, Error **errp)
 }
 
 /*
- * Syms of stubs in libqemuutil.a are discarded at .o file granularity.
- * To replace monitor_get_fd() we must ensure everything in
- * stubs/monitor.c is defined, to make sure monitor.o is discarded
+ * Syms of stubs in libqemuutil.a are discarded at .o file
+ * granularity.  To replace monitor_get_fd() and monitor_cur(), we
+ * must ensure that we also replace any other symbol that is used in
+ * the binary and would be taken from the same stub object file,
  * otherwise we get duplicate syms at link time.
  */
-__thread Monitor *cur_mon;
+Monitor *monitor_cur(void) { return cur_mon; }
 int monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) { abort(); }
-void monitor_init_qmp(Chardev *chr, bool pretty, Error **errp) {}
-void monitor_init_hmp(Chardev *chr, bool use_readline, Error **errp) {}
 
 #ifndef _WIN32
 static void test_socket_fd_pass_name_good(void)

+ 1 - 1
trace/control.c

@@ -176,7 +176,7 @@ void trace_enable_events(const char *line_buf)
 {
     if (is_help_option(line_buf)) {
         trace_list_events();
-        if (cur_mon == NULL) {
+        if (monitor_cur() == NULL) {
             exit(0);
         }
     } else {

+ 7 - 1
util/aio-posix.c

@@ -15,6 +15,7 @@
 
 #include "qemu/osdep.h"
 #include "block/block.h"
+#include "qemu/main-loop.h"
 #include "qemu/rcu.h"
 #include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
@@ -558,8 +559,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
      * There cannot be two concurrent aio_poll calls for the same AioContext (or
      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
+     *
+     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
+     * is special in that it runs in the main thread, but that thread's context
+     * is qemu_aio_context.
      */
-    assert(in_aio_context_home_thread(ctx));
+    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
+                                      qemu_get_aio_context() : ctx));
 
     qemu_lockcnt_inc(&ctx->list_lock);
 

+ 30 - 0
util/async.c

@@ -569,6 +569,36 @@ void aio_co_schedule(AioContext *ctx, Coroutine *co)
     aio_context_unref(ctx);
 }
 
+typedef struct AioCoRescheduleSelf {
+    Coroutine *co;
+    AioContext *new_ctx;
+} AioCoRescheduleSelf;
+
+static void aio_co_reschedule_self_bh(void *opaque)
+{
+    AioCoRescheduleSelf *data = opaque;
+    aio_co_schedule(data->new_ctx, data->co);
+}
+
+void coroutine_fn aio_co_reschedule_self(AioContext *new_ctx)
+{
+    AioContext *old_ctx = qemu_get_current_aio_context();
+
+    if (old_ctx != new_ctx) {
+        AioCoRescheduleSelf data = {
+            .co = qemu_coroutine_self(),
+            .new_ctx = new_ctx,
+        };
+        /*
+         * We can't directly schedule the coroutine in the target context
+         * because this would be racy: The other thread could try to enter the
+         * coroutine before it has yielded in this one.
+         */
+        aio_bh_schedule_oneshot(old_ctx, aio_co_reschedule_self_bh, &data);
+        qemu_coroutine_yield();
+    }
+}
+
 void aio_co_wake(struct Coroutine *co)
 {
     AioContext *ctx;

+ 3 - 3
util/qemu-error.c

@@ -171,7 +171,7 @@ static void print_loc(void)
     int i;
     const char *const *argp;
 
-    if (!cur_mon && progname) {
+    if (!monitor_cur() && progname) {
         fprintf(stderr, "%s:", progname);
         sep = " ";
     }
@@ -208,7 +208,7 @@ static void vreport(report_type type, const char *fmt, va_list ap)
     GTimeVal tv;
     gchar *timestr;
 
-    if (error_with_timestamp && !cur_mon) {
+    if (error_with_timestamp && !monitor_cur()) {
         g_get_current_time(&tv);
         timestr = g_time_val_to_iso8601(&tv);
         error_printf("%s ", timestr);
@@ -216,7 +216,7 @@ static void vreport(report_type type, const char *fmt, va_list ap)
     }
 
     /* Only prepend guest name if -msg guest-name and -name guest=... are set */
-    if (error_with_guestname && error_guest_name && !cur_mon) {
+    if (error_with_guestname && error_guest_name && !monitor_cur()) {
         error_printf("%s ", error_guest_name);
     }
 

+ 2 - 1
util/qemu-print.c

@@ -20,6 +20,7 @@
  */
 int qemu_vprintf(const char *fmt, va_list ap)
 {
+    Monitor *cur_mon = monitor_cur();
     if (cur_mon) {
         return monitor_vprintf(cur_mon, fmt, ap);
     }
@@ -48,7 +49,7 @@ int qemu_printf(const char *fmt, ...)
 int qemu_vfprintf(FILE *stream, const char *fmt, va_list ap)
 {
     if (!stream) {
-        return monitor_vprintf(cur_mon, fmt, ap);
+        return monitor_vprintf(monitor_cur(), fmt, ap);
     }
     return vfprintf(stream, fmt, ap);
 }

+ 1 - 0
util/qemu-sockets.c

@@ -1092,6 +1092,7 @@ fail:
 
 static int socket_get_fd(const char *fdstr, int num, Error **errp)
 {
+    Monitor *cur_mon = monitor_cur();
     int fd;
     if (num != 1) {
         error_setg_errno(errp, EINVAL, "socket_get_fd: too many connections");