qemu-coroutine.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. * QEMU coroutines
  3. *
  4. * Copyright IBM, Corp. 2011
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
  8. * Kevin Wolf <kwolf@redhat.com>
  9. *
  10. * This work is licensed under the terms of the GNU LGPL, version 2 or later.
  11. * See the COPYING.LIB file in the top-level directory.
  12. *
  13. */
  14. #include "qemu/osdep.h"
  15. #include "trace.h"
  16. #include "qemu/thread.h"
  17. #include "qemu/atomic.h"
  18. #include "qemu/coroutine_int.h"
  19. #include "qemu/coroutine-tls.h"
  20. #include "qemu/cutils.h"
  21. #include "block/aio.h"
  22. enum {
  23. COROUTINE_POOL_BATCH_MAX_SIZE = 128,
  24. };
  25. /*
  26. * Coroutine creation and deletion is expensive so a pool of unused coroutines
  27. * is kept as a cache. When the pool has coroutines available, they are
  28. * recycled instead of creating new ones from scratch. Coroutines are added to
  29. * the pool upon termination.
  30. *
  31. * The pool is global but each thread maintains a small local pool to avoid
  32. * global pool contention. Threads fetch and return batches of coroutines from
  33. * the global pool to maintain their local pool. The local pool holds up to two
  34. * batches whereas the maximum size of the global pool is controlled by the
  35. * qemu_coroutine_inc_pool_size() API.
  36. *
  37. * .-----------------------------------.
  38. * | Batch 1 | Batch 2 | Batch 3 | ... | global_pool
  39. * `-----------------------------------'
  40. *
  41. * .-------------------.
  42. * | Batch 1 | Batch 2 | per-thread local_pool (maximum 2 batches)
  43. * `-------------------'
  44. */
  45. typedef struct CoroutinePoolBatch {
  46. /* Batches are kept in a list */
  47. QSLIST_ENTRY(CoroutinePoolBatch) next;
  48. /* This batch holds up to @COROUTINE_POOL_BATCH_MAX_SIZE coroutines */
  49. QSLIST_HEAD(, Coroutine) list;
  50. unsigned int size;
  51. } CoroutinePoolBatch;
  52. typedef QSLIST_HEAD(, CoroutinePoolBatch) CoroutinePool;
  53. /* Host operating system limit on number of pooled coroutines */
  54. static unsigned int global_pool_hard_max_size;
  55. static QemuMutex global_pool_lock; /* protects the following variables */
  56. static CoroutinePool global_pool = QSLIST_HEAD_INITIALIZER(global_pool);
  57. static unsigned int global_pool_size;
  58. static unsigned int global_pool_max_size = COROUTINE_POOL_BATCH_MAX_SIZE;
  59. QEMU_DEFINE_STATIC_CO_TLS(CoroutinePool, local_pool);
  60. QEMU_DEFINE_STATIC_CO_TLS(Notifier, local_pool_cleanup_notifier);
  61. static CoroutinePoolBatch *coroutine_pool_batch_new(void)
  62. {
  63. CoroutinePoolBatch *batch = g_new(CoroutinePoolBatch, 1);
  64. QSLIST_INIT(&batch->list);
  65. batch->size = 0;
  66. return batch;
  67. }
  68. static void coroutine_pool_batch_delete(CoroutinePoolBatch *batch)
  69. {
  70. Coroutine *co;
  71. Coroutine *tmp;
  72. QSLIST_FOREACH_SAFE(co, &batch->list, pool_next, tmp) {
  73. QSLIST_REMOVE_HEAD(&batch->list, pool_next);
  74. qemu_coroutine_delete(co);
  75. }
  76. g_free(batch);
  77. }
  78. static void local_pool_cleanup(Notifier *n, void *value)
  79. {
  80. CoroutinePool *local_pool = get_ptr_local_pool();
  81. CoroutinePoolBatch *batch;
  82. CoroutinePoolBatch *tmp;
  83. QSLIST_FOREACH_SAFE(batch, local_pool, next, tmp) {
  84. QSLIST_REMOVE_HEAD(local_pool, next);
  85. coroutine_pool_batch_delete(batch);
  86. }
  87. }
  88. /* Ensure the atexit notifier is registered */
  89. static void local_pool_cleanup_init_once(void)
  90. {
  91. Notifier *notifier = get_ptr_local_pool_cleanup_notifier();
  92. if (!notifier->notify) {
  93. notifier->notify = local_pool_cleanup;
  94. qemu_thread_atexit_add(notifier);
  95. }
  96. }
  97. /* Helper to get the next unused coroutine from the local pool */
  98. static Coroutine *coroutine_pool_get_local(void)
  99. {
  100. CoroutinePool *local_pool = get_ptr_local_pool();
  101. CoroutinePoolBatch *batch = QSLIST_FIRST(local_pool);
  102. Coroutine *co;
  103. if (unlikely(!batch)) {
  104. return NULL;
  105. }
  106. co = QSLIST_FIRST(&batch->list);
  107. QSLIST_REMOVE_HEAD(&batch->list, pool_next);
  108. batch->size--;
  109. if (batch->size == 0) {
  110. QSLIST_REMOVE_HEAD(local_pool, next);
  111. coroutine_pool_batch_delete(batch);
  112. }
  113. return co;
  114. }
  115. /* Get the next batch from the global pool */
  116. static void coroutine_pool_refill_local(void)
  117. {
  118. CoroutinePool *local_pool = get_ptr_local_pool();
  119. CoroutinePoolBatch *batch;
  120. WITH_QEMU_LOCK_GUARD(&global_pool_lock) {
  121. batch = QSLIST_FIRST(&global_pool);
  122. if (batch) {
  123. QSLIST_REMOVE_HEAD(&global_pool, next);
  124. global_pool_size -= batch->size;
  125. }
  126. }
  127. if (batch) {
  128. QSLIST_INSERT_HEAD(local_pool, batch, next);
  129. local_pool_cleanup_init_once();
  130. }
  131. }
  132. /* Add a batch of coroutines to the global pool */
  133. static void coroutine_pool_put_global(CoroutinePoolBatch *batch)
  134. {
  135. WITH_QEMU_LOCK_GUARD(&global_pool_lock) {
  136. unsigned int max = MIN(global_pool_max_size,
  137. global_pool_hard_max_size);
  138. if (global_pool_size < max) {
  139. QSLIST_INSERT_HEAD(&global_pool, batch, next);
  140. /* Overshooting the max pool size is allowed */
  141. global_pool_size += batch->size;
  142. return;
  143. }
  144. }
  145. /* The global pool was full, so throw away this batch */
  146. coroutine_pool_batch_delete(batch);
  147. }
  148. /* Get the next unused coroutine from the pool or return NULL */
  149. static Coroutine *coroutine_pool_get(void)
  150. {
  151. Coroutine *co;
  152. co = coroutine_pool_get_local();
  153. if (!co) {
  154. coroutine_pool_refill_local();
  155. co = coroutine_pool_get_local();
  156. }
  157. return co;
  158. }
  159. static void coroutine_pool_put(Coroutine *co)
  160. {
  161. CoroutinePool *local_pool = get_ptr_local_pool();
  162. CoroutinePoolBatch *batch = QSLIST_FIRST(local_pool);
  163. if (unlikely(!batch)) {
  164. batch = coroutine_pool_batch_new();
  165. QSLIST_INSERT_HEAD(local_pool, batch, next);
  166. local_pool_cleanup_init_once();
  167. }
  168. if (unlikely(batch->size >= COROUTINE_POOL_BATCH_MAX_SIZE)) {
  169. CoroutinePoolBatch *next = QSLIST_NEXT(batch, next);
  170. /* Is the local pool full? */
  171. if (next) {
  172. QSLIST_REMOVE_HEAD(local_pool, next);
  173. coroutine_pool_put_global(batch);
  174. }
  175. batch = coroutine_pool_batch_new();
  176. QSLIST_INSERT_HEAD(local_pool, batch, next);
  177. }
  178. QSLIST_INSERT_HEAD(&batch->list, co, pool_next);
  179. batch->size++;
  180. }
  181. Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
  182. {
  183. Coroutine *co = NULL;
  184. if (IS_ENABLED(CONFIG_COROUTINE_POOL)) {
  185. co = coroutine_pool_get();
  186. }
  187. if (!co) {
  188. co = qemu_coroutine_new();
  189. }
  190. co->entry = entry;
  191. co->entry_arg = opaque;
  192. QSIMPLEQ_INIT(&co->co_queue_wakeup);
  193. return co;
  194. }
  195. static void coroutine_delete(Coroutine *co)
  196. {
  197. co->caller = NULL;
  198. if (IS_ENABLED(CONFIG_COROUTINE_POOL)) {
  199. coroutine_pool_put(co);
  200. } else {
  201. qemu_coroutine_delete(co);
  202. }
  203. }
  204. void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
  205. {
  206. QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);
  207. Coroutine *from = qemu_coroutine_self();
  208. QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);
  209. /* Run co and any queued coroutines */
  210. while (!QSIMPLEQ_EMPTY(&pending)) {
  211. Coroutine *to = QSIMPLEQ_FIRST(&pending);
  212. CoroutineAction ret;
  213. /*
  214. * Read to before to->scheduled; pairs with qatomic_cmpxchg in
  215. * qemu_co_sleep(), aio_co_schedule() etc.
  216. */
  217. smp_read_barrier_depends();
  218. const char *scheduled = qatomic_read(&to->scheduled);
  219. QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);
  220. trace_qemu_aio_coroutine_enter(ctx, from, to, to->entry_arg);
  221. /* if the Coroutine has already been scheduled, entering it again will
  222. * cause us to enter it twice, potentially even after the coroutine has
  223. * been deleted */
  224. if (scheduled) {
  225. fprintf(stderr,
  226. "%s: Co-routine was already scheduled in '%s'\n",
  227. __func__, scheduled);
  228. abort();
  229. }
  230. if (to->caller) {
  231. fprintf(stderr, "Co-routine re-entered recursively\n");
  232. abort();
  233. }
  234. to->caller = from;
  235. to->ctx = ctx;
  236. /* Store to->ctx before anything that stores to. Matches
  237. * barrier in aio_co_wake and qemu_co_mutex_wake.
  238. */
  239. smp_wmb();
  240. ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);
  241. /* Queued coroutines are run depth-first; previously pending coroutines
  242. * run after those queued more recently.
  243. */
  244. QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);
  245. switch (ret) {
  246. case COROUTINE_YIELD:
  247. break;
  248. case COROUTINE_TERMINATE:
  249. assert(!to->locks_held);
  250. trace_qemu_coroutine_terminate(to);
  251. coroutine_delete(to);
  252. break;
  253. default:
  254. abort();
  255. }
  256. }
  257. }
  258. void qemu_coroutine_enter(Coroutine *co)
  259. {
  260. qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
  261. }
  262. void qemu_coroutine_enter_if_inactive(Coroutine *co)
  263. {
  264. if (!qemu_coroutine_entered(co)) {
  265. qemu_coroutine_enter(co);
  266. }
  267. }
  268. void coroutine_fn qemu_coroutine_yield(void)
  269. {
  270. Coroutine *self = qemu_coroutine_self();
  271. Coroutine *to = self->caller;
  272. trace_qemu_coroutine_yield(self, to);
  273. if (!to) {
  274. fprintf(stderr, "Co-routine is yielding to no one\n");
  275. abort();
  276. }
  277. self->caller = NULL;
  278. qemu_coroutine_switch(self, to, COROUTINE_YIELD);
  279. }
  280. bool qemu_coroutine_entered(Coroutine *co)
  281. {
  282. return co->caller;
  283. }
  284. AioContext *qemu_coroutine_get_aio_context(Coroutine *co)
  285. {
  286. return co->ctx;
  287. }
  288. void qemu_coroutine_inc_pool_size(unsigned int additional_pool_size)
  289. {
  290. QEMU_LOCK_GUARD(&global_pool_lock);
  291. global_pool_max_size += additional_pool_size;
  292. }
  293. void qemu_coroutine_dec_pool_size(unsigned int removing_pool_size)
  294. {
  295. QEMU_LOCK_GUARD(&global_pool_lock);
  296. global_pool_max_size -= removing_pool_size;
  297. }
  298. static unsigned int get_global_pool_hard_max_size(void)
  299. {
  300. #ifdef __linux__
  301. g_autofree char *contents = NULL;
  302. int max_map_count;
  303. /*
  304. * Linux processes can have up to max_map_count virtual memory areas
  305. * (VMAs). mmap(2), mprotect(2), etc fail with ENOMEM beyond this limit. We
  306. * must limit the coroutine pool to a safe size to avoid running out of
  307. * VMAs.
  308. */
  309. if (g_file_get_contents("/proc/sys/vm/max_map_count", &contents, NULL,
  310. NULL) &&
  311. qemu_strtoi(contents, NULL, 10, &max_map_count) == 0) {
  312. /*
  313. * This is an upper bound that avoids exceeding max_map_count. Leave a
  314. * fixed amount for non-coroutine users like library dependencies,
  315. * vhost-user, etc. Each coroutine takes up 2 VMAs so halve the
  316. * remaining amount.
  317. */
  318. if (max_map_count > 5000) {
  319. return (max_map_count - 5000) / 2;
  320. } else {
  321. /* Disable the global pool but threads still have local pools */
  322. return 0;
  323. }
  324. }
  325. #endif
  326. return UINT_MAX;
  327. }
  328. static void __attribute__((constructor)) qemu_coroutine_init(void)
  329. {
  330. qemu_mutex_init(&global_pool_lock);
  331. global_pool_hard_max_size = get_global_pool_hard_max_size();
  332. }