qemu-coroutine.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. * QEMU coroutines
  3. *
  4. * Copyright IBM, Corp. 2011
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
  8. * Kevin Wolf <kwolf@redhat.com>
  9. *
  10. * This work is licensed under the terms of the GNU LGPL, version 2 or later.
  11. * See the COPYING.LIB file in the top-level directory.
  12. *
  13. */
  14. #include "qemu/osdep.h"
  15. #include "trace.h"
  16. #include "qemu/thread.h"
  17. #include "qemu/atomic.h"
  18. #include "qemu/coroutine.h"
  19. #include "qemu/coroutine_int.h"
  20. #include "block/aio.h"
  21. enum {
  22. POOL_BATCH_SIZE = 64,
  23. };
  24. /** Free list to speed up creation */
  25. static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
  26. static unsigned int release_pool_size;
  27. static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool);
  28. static __thread unsigned int alloc_pool_size;
  29. static __thread Notifier coroutine_pool_cleanup_notifier;
  30. static void coroutine_pool_cleanup(Notifier *n, void *value)
  31. {
  32. Coroutine *co;
  33. Coroutine *tmp;
  34. QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) {
  35. QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
  36. qemu_coroutine_delete(co);
  37. }
  38. }
  39. Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
  40. {
  41. Coroutine *co = NULL;
  42. if (CONFIG_COROUTINE_POOL) {
  43. co = QSLIST_FIRST(&alloc_pool);
  44. if (!co) {
  45. if (release_pool_size > POOL_BATCH_SIZE) {
  46. /* Slow path; a good place to register the destructor, too. */
  47. if (!coroutine_pool_cleanup_notifier.notify) {
  48. coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
  49. qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
  50. }
  51. /* This is not exact; there could be a little skew between
  52. * release_pool_size and the actual size of release_pool. But
  53. * it is just a heuristic, it does not need to be perfect.
  54. */
  55. alloc_pool_size = atomic_xchg(&release_pool_size, 0);
  56. QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
  57. co = QSLIST_FIRST(&alloc_pool);
  58. }
  59. }
  60. if (co) {
  61. QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
  62. alloc_pool_size--;
  63. }
  64. }
  65. if (!co) {
  66. co = qemu_coroutine_new();
  67. }
  68. co->entry = entry;
  69. co->entry_arg = opaque;
  70. QSIMPLEQ_INIT(&co->co_queue_wakeup);
  71. return co;
  72. }
  73. static void coroutine_delete(Coroutine *co)
  74. {
  75. co->caller = NULL;
  76. if (CONFIG_COROUTINE_POOL) {
  77. if (release_pool_size < POOL_BATCH_SIZE * 2) {
  78. QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
  79. atomic_inc(&release_pool_size);
  80. return;
  81. }
  82. if (alloc_pool_size < POOL_BATCH_SIZE) {
  83. QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
  84. alloc_pool_size++;
  85. return;
  86. }
  87. }
  88. qemu_coroutine_delete(co);
  89. }
  90. void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
  91. {
  92. QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);
  93. Coroutine *from = qemu_coroutine_self();
  94. QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);
  95. /* Run co and any queued coroutines */
  96. while (!QSIMPLEQ_EMPTY(&pending)) {
  97. Coroutine *to = QSIMPLEQ_FIRST(&pending);
  98. CoroutineAction ret;
  99. /* Cannot rely on the read barrier for to in aio_co_wake(), as there are
  100. * callers outside of aio_co_wake() */
  101. const char *scheduled = atomic_mb_read(&to->scheduled);
  102. QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);
  103. trace_qemu_aio_coroutine_enter(ctx, from, to, to->entry_arg);
  104. /* if the Coroutine has already been scheduled, entering it again will
  105. * cause us to enter it twice, potentially even after the coroutine has
  106. * been deleted */
  107. if (scheduled) {
  108. fprintf(stderr,
  109. "%s: Co-routine was already scheduled in '%s'\n",
  110. __func__, scheduled);
  111. abort();
  112. }
  113. if (to->caller) {
  114. fprintf(stderr, "Co-routine re-entered recursively\n");
  115. abort();
  116. }
  117. to->caller = from;
  118. to->ctx = ctx;
  119. /* Store to->ctx before anything that stores to. Matches
  120. * barrier in aio_co_wake and qemu_co_mutex_wake.
  121. */
  122. smp_wmb();
  123. ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);
  124. /* Queued coroutines are run depth-first; previously pending coroutines
  125. * run after those queued more recently.
  126. */
  127. QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);
  128. switch (ret) {
  129. case COROUTINE_YIELD:
  130. break;
  131. case COROUTINE_TERMINATE:
  132. assert(!to->locks_held);
  133. trace_qemu_coroutine_terminate(to);
  134. coroutine_delete(to);
  135. break;
  136. default:
  137. abort();
  138. }
  139. }
  140. }
  141. void qemu_coroutine_enter(Coroutine *co)
  142. {
  143. qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
  144. }
  145. void qemu_coroutine_enter_if_inactive(Coroutine *co)
  146. {
  147. if (!qemu_coroutine_entered(co)) {
  148. qemu_coroutine_enter(co);
  149. }
  150. }
  151. void coroutine_fn qemu_coroutine_yield(void)
  152. {
  153. Coroutine *self = qemu_coroutine_self();
  154. Coroutine *to = self->caller;
  155. trace_qemu_coroutine_yield(self, to);
  156. if (!to) {
  157. fprintf(stderr, "Co-routine is yielding to no one\n");
  158. abort();
  159. }
  160. self->caller = NULL;
  161. qemu_coroutine_switch(self, to, COROUTINE_YIELD);
  162. }
  163. bool qemu_coroutine_entered(Coroutine *co)
  164. {
  165. return co->caller;
  166. }
  167. AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
  168. {
  169. return co->ctx;
  170. }