2
0

qemu-coroutine.c 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /*
  2. * QEMU coroutines
  3. *
  4. * Copyright IBM, Corp. 2011
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
  8. * Kevin Wolf <kwolf@redhat.com>
  9. *
  10. * This work is licensed under the terms of the GNU LGPL, version 2 or later.
  11. * See the COPYING.LIB file in the top-level directory.
  12. *
  13. */
  14. #include "qemu/osdep.h"
  15. #include "trace.h"
  16. #include "qemu/thread.h"
  17. #include "qemu/atomic.h"
  18. #include "qemu/coroutine_int.h"
  19. #include "qemu/coroutine-tls.h"
  20. #include "block/aio.h"
  21. /**
  22. * The minimal batch size is always 64, coroutines from the release_pool are
  23. * reused as soon as there are 64 coroutines in it. The maximum pool size starts
  24. * with 64 and is increased on demand so that coroutines are not deleted even if
  25. * they are not immediately reused.
  26. */
  27. enum {
  28. POOL_MIN_BATCH_SIZE = 64,
  29. POOL_INITIAL_MAX_SIZE = 64,
  30. };
  31. /** Free list to speed up creation */
  32. static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
  33. static unsigned int pool_max_size = POOL_INITIAL_MAX_SIZE;
  34. static unsigned int release_pool_size;
  35. typedef QSLIST_HEAD(, Coroutine) CoroutineQSList;
  36. QEMU_DEFINE_STATIC_CO_TLS(CoroutineQSList, alloc_pool);
  37. QEMU_DEFINE_STATIC_CO_TLS(unsigned int, alloc_pool_size);
  38. QEMU_DEFINE_STATIC_CO_TLS(Notifier, coroutine_pool_cleanup_notifier);
  39. static void coroutine_pool_cleanup(Notifier *n, void *value)
  40. {
  41. Coroutine *co;
  42. Coroutine *tmp;
  43. CoroutineQSList *alloc_pool = get_ptr_alloc_pool();
  44. QSLIST_FOREACH_SAFE(co, alloc_pool, pool_next, tmp) {
  45. QSLIST_REMOVE_HEAD(alloc_pool, pool_next);
  46. qemu_coroutine_delete(co);
  47. }
  48. }
  49. Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
  50. {
  51. Coroutine *co = NULL;
  52. if (CONFIG_COROUTINE_POOL) {
  53. CoroutineQSList *alloc_pool = get_ptr_alloc_pool();
  54. co = QSLIST_FIRST(alloc_pool);
  55. if (!co) {
  56. if (release_pool_size > POOL_MIN_BATCH_SIZE) {
  57. /* Slow path; a good place to register the destructor, too. */
  58. Notifier *notifier = get_ptr_coroutine_pool_cleanup_notifier();
  59. if (!notifier->notify) {
  60. notifier->notify = coroutine_pool_cleanup;
  61. qemu_thread_atexit_add(notifier);
  62. }
  63. /* This is not exact; there could be a little skew between
  64. * release_pool_size and the actual size of release_pool. But
  65. * it is just a heuristic, it does not need to be perfect.
  66. */
  67. set_alloc_pool_size(qatomic_xchg(&release_pool_size, 0));
  68. QSLIST_MOVE_ATOMIC(alloc_pool, &release_pool);
  69. co = QSLIST_FIRST(alloc_pool);
  70. }
  71. }
  72. if (co) {
  73. QSLIST_REMOVE_HEAD(alloc_pool, pool_next);
  74. set_alloc_pool_size(get_alloc_pool_size() - 1);
  75. }
  76. }
  77. if (!co) {
  78. co = qemu_coroutine_new();
  79. }
  80. co->entry = entry;
  81. co->entry_arg = opaque;
  82. QSIMPLEQ_INIT(&co->co_queue_wakeup);
  83. return co;
  84. }
  85. static void coroutine_delete(Coroutine *co)
  86. {
  87. co->caller = NULL;
  88. if (CONFIG_COROUTINE_POOL) {
  89. if (release_pool_size < qatomic_read(&pool_max_size) * 2) {
  90. QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
  91. qatomic_inc(&release_pool_size);
  92. return;
  93. }
  94. if (get_alloc_pool_size() < qatomic_read(&pool_max_size)) {
  95. QSLIST_INSERT_HEAD(get_ptr_alloc_pool(), co, pool_next);
  96. set_alloc_pool_size(get_alloc_pool_size() + 1);
  97. return;
  98. }
  99. }
  100. qemu_coroutine_delete(co);
  101. }
  102. void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
  103. {
  104. QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);
  105. Coroutine *from = qemu_coroutine_self();
  106. QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);
  107. /* Run co and any queued coroutines */
  108. while (!QSIMPLEQ_EMPTY(&pending)) {
  109. Coroutine *to = QSIMPLEQ_FIRST(&pending);
  110. CoroutineAction ret;
  111. /* Cannot rely on the read barrier for to in aio_co_wake(), as there are
  112. * callers outside of aio_co_wake() */
  113. const char *scheduled = qatomic_mb_read(&to->scheduled);
  114. QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);
  115. trace_qemu_aio_coroutine_enter(ctx, from, to, to->entry_arg);
  116. /* if the Coroutine has already been scheduled, entering it again will
  117. * cause us to enter it twice, potentially even after the coroutine has
  118. * been deleted */
  119. if (scheduled) {
  120. fprintf(stderr,
  121. "%s: Co-routine was already scheduled in '%s'\n",
  122. __func__, scheduled);
  123. abort();
  124. }
  125. if (to->caller) {
  126. fprintf(stderr, "Co-routine re-entered recursively\n");
  127. abort();
  128. }
  129. to->caller = from;
  130. to->ctx = ctx;
  131. /* Store to->ctx before anything that stores to. Matches
  132. * barrier in aio_co_wake and qemu_co_mutex_wake.
  133. */
  134. smp_wmb();
  135. ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);
  136. /* Queued coroutines are run depth-first; previously pending coroutines
  137. * run after those queued more recently.
  138. */
  139. QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);
  140. switch (ret) {
  141. case COROUTINE_YIELD:
  142. break;
  143. case COROUTINE_TERMINATE:
  144. assert(!to->locks_held);
  145. trace_qemu_coroutine_terminate(to);
  146. coroutine_delete(to);
  147. break;
  148. default:
  149. abort();
  150. }
  151. }
  152. }
  153. void qemu_coroutine_enter(Coroutine *co)
  154. {
  155. qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
  156. }
  157. void qemu_coroutine_enter_if_inactive(Coroutine *co)
  158. {
  159. if (!qemu_coroutine_entered(co)) {
  160. qemu_coroutine_enter(co);
  161. }
  162. }
  163. void coroutine_fn qemu_coroutine_yield(void)
  164. {
  165. Coroutine *self = qemu_coroutine_self();
  166. Coroutine *to = self->caller;
  167. trace_qemu_coroutine_yield(self, to);
  168. if (!to) {
  169. fprintf(stderr, "Co-routine is yielding to no one\n");
  170. abort();
  171. }
  172. self->caller = NULL;
  173. qemu_coroutine_switch(self, to, COROUTINE_YIELD);
  174. }
  175. bool qemu_coroutine_entered(Coroutine *co)
  176. {
  177. return co->caller;
  178. }
  179. AioContext *qemu_coroutine_get_aio_context(Coroutine *co)
  180. {
  181. return co->ctx;
  182. }
  183. void qemu_coroutine_inc_pool_size(unsigned int additional_pool_size)
  184. {
  185. qatomic_add(&pool_max_size, additional_pool_size);
  186. }
  187. void qemu_coroutine_dec_pool_size(unsigned int removing_pool_size)
  188. {
  189. qatomic_sub(&pool_max_size, removing_pool_size);
  190. }