|
@@ -19,6 +19,7 @@
|
|
*/
|
|
*/
|
|
|
|
|
|
#include "qemu/osdep.h"
|
|
#include "qemu/osdep.h"
|
|
|
|
+#include "block/aio-wait.h"
|
|
#include "io/channel.h"
|
|
#include "io/channel.h"
|
|
#include "qapi/error.h"
|
|
#include "qapi/error.h"
|
|
#include "qemu/main-loop.h"
|
|
#include "qemu/main-loop.h"
|
|
@@ -514,7 +515,11 @@ int qio_channel_flush(QIOChannel *ioc,
|
|
static void qio_channel_restart_read(void *opaque)
|
|
static void qio_channel_restart_read(void *opaque)
|
|
{
|
|
{
|
|
QIOChannel *ioc = opaque;
|
|
QIOChannel *ioc = opaque;
|
|
- Coroutine *co = ioc->read_coroutine;
|
|
|
|
|
|
+ Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
|
|
|
|
+
|
|
|
|
+ if (!co) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
/* Assert that aio_co_wake() reenters the coroutine directly */
|
|
/* Assert that aio_co_wake() reenters the coroutine directly */
|
|
assert(qemu_get_current_aio_context() ==
|
|
assert(qemu_get_current_aio_context() ==
|
|
@@ -525,7 +530,11 @@ static void qio_channel_restart_read(void *opaque)
|
|
static void qio_channel_restart_write(void *opaque)
|
|
static void qio_channel_restart_write(void *opaque)
|
|
{
|
|
{
|
|
QIOChannel *ioc = opaque;
|
|
QIOChannel *ioc = opaque;
|
|
- Coroutine *co = ioc->write_coroutine;
|
|
|
|
|
|
+ Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
|
|
|
|
+
|
|
|
|
+ if (!co) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
/* Assert that aio_co_wake() reenters the coroutine directly */
|
|
/* Assert that aio_co_wake() reenters the coroutine directly */
|
|
assert(qemu_get_current_aio_context() ==
|
|
assert(qemu_get_current_aio_context() ==
|
|
@@ -568,7 +577,11 @@ void qio_channel_detach_aio_context(QIOChannel *ioc)
|
|
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
|
|
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
|
|
GIOCondition condition)
|
|
GIOCondition condition)
|
|
{
|
|
{
|
|
|
|
+ AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
|
|
|
|
+
|
|
assert(qemu_in_coroutine());
|
|
assert(qemu_in_coroutine());
|
|
|
|
+ assert(in_aio_context_home_thread(ioc_ctx));
|
|
|
|
+
|
|
if (condition == G_IO_IN) {
|
|
if (condition == G_IO_IN) {
|
|
assert(!ioc->read_coroutine);
|
|
assert(!ioc->read_coroutine);
|
|
ioc->read_coroutine = qemu_coroutine_self();
|
|
ioc->read_coroutine = qemu_coroutine_self();
|
|
@@ -580,18 +593,26 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
|
|
}
|
|
}
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
qemu_coroutine_yield();
|
|
qemu_coroutine_yield();
|
|
|
|
+ assert(in_aio_context_home_thread(ioc_ctx));
|
|
|
|
|
|
/* Allow interrupting the operation by reentering the coroutine other than
|
|
/* Allow interrupting the operation by reentering the coroutine other than
|
|
* through the aio_fd_handlers. */
|
|
* through the aio_fd_handlers. */
|
|
- if (condition == G_IO_IN && ioc->read_coroutine) {
|
|
|
|
- ioc->read_coroutine = NULL;
|
|
|
|
|
|
+ if (condition == G_IO_IN) {
|
|
|
|
+ assert(ioc->read_coroutine == NULL);
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
- } else if (condition == G_IO_OUT && ioc->write_coroutine) {
|
|
|
|
- ioc->write_coroutine = NULL;
|
|
|
|
|
|
+ } else if (condition == G_IO_OUT) {
|
|
|
|
+ assert(ioc->write_coroutine == NULL);
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
qio_channel_set_aio_fd_handlers(ioc);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void qio_channel_wake_read(QIOChannel *ioc)
|
|
|
|
+{
|
|
|
|
+ Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
|
|
|
|
+ if (co) {
|
|
|
|
+ aio_co_wake(co);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
static gboolean qio_channel_wait_complete(QIOChannel *ioc,
|
|
static gboolean qio_channel_wait_complete(QIOChannel *ioc,
|
|
GIOCondition condition,
|
|
GIOCondition condition,
|