thread-pool.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*
  2. * QEMU block layer thread pool
  3. *
  4. * Copyright IBM, Corp. 2008
  5. * Copyright Red Hat, Inc. 2012
  6. *
  7. * Authors:
  8. * Anthony Liguori <aliguori@us.ibm.com>
  9. * Paolo Bonzini <pbonzini@redhat.com>
  10. *
  11. * This work is licensed under the terms of the GNU GPL, version 2. See
  12. * the COPYING file in the top-level directory.
  13. *
  14. * Contributions after 2012-01-13 are licensed under the terms of the
  15. * GNU GPL, version 2 or (at your option) any later version.
  16. */
  17. #include "qemu/osdep.h"
  18. #include "qemu/queue.h"
  19. #include "qemu/thread.h"
  20. #include "qemu/coroutine.h"
  21. #include "trace.h"
  22. #include "block/thread-pool.h"
  23. #include "qemu/main-loop.h"
  24. static void do_spawn_thread(ThreadPool *pool);
  25. typedef struct ThreadPoolElement ThreadPoolElement;
  26. enum ThreadState {
  27. THREAD_QUEUED,
  28. THREAD_ACTIVE,
  29. THREAD_DONE,
  30. };
  31. struct ThreadPoolElement {
  32. BlockAIOCB common;
  33. ThreadPool *pool;
  34. ThreadPoolFunc *func;
  35. void *arg;
  36. /* Moving state out of THREAD_QUEUED is protected by lock. After
  37. * that, only the worker thread can write to it. Reads and writes
  38. * of state and ret are ordered with memory barriers.
  39. */
  40. enum ThreadState state;
  41. int ret;
  42. /* Access to this list is protected by lock. */
  43. QTAILQ_ENTRY(ThreadPoolElement) reqs;
  44. /* Access to this list is protected by the global mutex. */
  45. QLIST_ENTRY(ThreadPoolElement) all;
  46. };
  47. struct ThreadPool {
  48. AioContext *ctx;
  49. QEMUBH *completion_bh;
  50. QemuMutex lock;
  51. QemuCond worker_stopped;
  52. QemuCond request_cond;
  53. QEMUBH *new_thread_bh;
  54. /* The following variables are only accessed from one AioContext. */
  55. QLIST_HEAD(, ThreadPoolElement) head;
  56. /* The following variables are protected by lock. */
  57. QTAILQ_HEAD(, ThreadPoolElement) request_list;
  58. int cur_threads;
  59. int idle_threads;
  60. int new_threads; /* backlog of threads we need to create */
  61. int pending_threads; /* threads created but not running yet */
  62. int min_threads;
  63. int max_threads;
  64. };
  65. static void *worker_thread(void *opaque)
  66. {
  67. ThreadPool *pool = opaque;
  68. qemu_mutex_lock(&pool->lock);
  69. pool->pending_threads--;
  70. do_spawn_thread(pool);
  71. while (pool->cur_threads <= pool->max_threads) {
  72. ThreadPoolElement *req;
  73. int ret;
  74. if (QTAILQ_EMPTY(&pool->request_list)) {
  75. pool->idle_threads++;
  76. ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
  77. pool->idle_threads--;
  78. if (ret == 0 &&
  79. QTAILQ_EMPTY(&pool->request_list) &&
  80. pool->cur_threads > pool->min_threads) {
  81. /* Timed out + no work to do + no need for warm threads = exit. */
  82. break;
  83. }
  84. /*
  85. * Even if there was some work to do, check if there aren't
  86. * too many worker threads before picking it up.
  87. */
  88. continue;
  89. }
  90. req = QTAILQ_FIRST(&pool->request_list);
  91. QTAILQ_REMOVE(&pool->request_list, req, reqs);
  92. req->state = THREAD_ACTIVE;
  93. qemu_mutex_unlock(&pool->lock);
  94. ret = req->func(req->arg);
  95. req->ret = ret;
  96. /* Write ret before state. */
  97. smp_wmb();
  98. req->state = THREAD_DONE;
  99. qemu_bh_schedule(pool->completion_bh);
  100. qemu_mutex_lock(&pool->lock);
  101. }
  102. pool->cur_threads--;
  103. qemu_cond_signal(&pool->worker_stopped);
  104. qemu_mutex_unlock(&pool->lock);
  105. /*
  106. * Wake up another thread, in case we got a wakeup but decided
  107. * to exit due to pool->cur_threads > pool->max_threads.
  108. */
  109. qemu_cond_signal(&pool->request_cond);
  110. return NULL;
  111. }
  112. static void do_spawn_thread(ThreadPool *pool)
  113. {
  114. QemuThread t;
  115. /* Runs with lock taken. */
  116. if (!pool->new_threads) {
  117. return;
  118. }
  119. pool->new_threads--;
  120. pool->pending_threads++;
  121. qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
  122. }
  123. static void spawn_thread_bh_fn(void *opaque)
  124. {
  125. ThreadPool *pool = opaque;
  126. qemu_mutex_lock(&pool->lock);
  127. do_spawn_thread(pool);
  128. qemu_mutex_unlock(&pool->lock);
  129. }
  130. static void spawn_thread(ThreadPool *pool)
  131. {
  132. pool->cur_threads++;
  133. pool->new_threads++;
  134. /* If there are threads being created, they will spawn new workers, so
  135. * we don't spend time creating many threads in a loop holding a mutex or
  136. * starving the current vcpu.
  137. *
  138. * If there are no idle threads, ask the main thread to create one, so we
  139. * inherit the correct affinity instead of the vcpu affinity.
  140. */
  141. if (!pool->pending_threads) {
  142. qemu_bh_schedule(pool->new_thread_bh);
  143. }
  144. }
  145. static void thread_pool_completion_bh(void *opaque)
  146. {
  147. ThreadPool *pool = opaque;
  148. ThreadPoolElement *elem, *next;
  149. aio_context_acquire(pool->ctx);
  150. restart:
  151. QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
  152. if (elem->state != THREAD_DONE) {
  153. continue;
  154. }
  155. trace_thread_pool_complete(pool, elem, elem->common.opaque,
  156. elem->ret);
  157. QLIST_REMOVE(elem, all);
  158. if (elem->common.cb) {
  159. /* Read state before ret. */
  160. smp_rmb();
  161. /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
  162. * wait for another request that completed at the same time.
  163. */
  164. qemu_bh_schedule(pool->completion_bh);
  165. aio_context_release(pool->ctx);
  166. elem->common.cb(elem->common.opaque, elem->ret);
  167. aio_context_acquire(pool->ctx);
  168. /* We can safely cancel the completion_bh here regardless of someone
  169. * else having scheduled it meanwhile because we reenter the
  170. * completion function anyway (goto restart).
  171. */
  172. qemu_bh_cancel(pool->completion_bh);
  173. qemu_aio_unref(elem);
  174. goto restart;
  175. } else {
  176. qemu_aio_unref(elem);
  177. }
  178. }
  179. aio_context_release(pool->ctx);
  180. }
  181. static void thread_pool_cancel(BlockAIOCB *acb)
  182. {
  183. ThreadPoolElement *elem = (ThreadPoolElement *)acb;
  184. ThreadPool *pool = elem->pool;
  185. trace_thread_pool_cancel(elem, elem->common.opaque);
  186. QEMU_LOCK_GUARD(&pool->lock);
  187. if (elem->state == THREAD_QUEUED) {
  188. QTAILQ_REMOVE(&pool->request_list, elem, reqs);
  189. qemu_bh_schedule(pool->completion_bh);
  190. elem->state = THREAD_DONE;
  191. elem->ret = -ECANCELED;
  192. }
  193. }
  194. static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
  195. {
  196. ThreadPoolElement *elem = (ThreadPoolElement *)acb;
  197. ThreadPool *pool = elem->pool;
  198. return pool->ctx;
  199. }
  200. static const AIOCBInfo thread_pool_aiocb_info = {
  201. .aiocb_size = sizeof(ThreadPoolElement),
  202. .cancel_async = thread_pool_cancel,
  203. .get_aio_context = thread_pool_get_aio_context,
  204. };
  205. BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
  206. ThreadPoolFunc *func, void *arg,
  207. BlockCompletionFunc *cb, void *opaque)
  208. {
  209. ThreadPoolElement *req;
  210. req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
  211. req->func = func;
  212. req->arg = arg;
  213. req->state = THREAD_QUEUED;
  214. req->pool = pool;
  215. QLIST_INSERT_HEAD(&pool->head, req, all);
  216. trace_thread_pool_submit(pool, req, arg);
  217. qemu_mutex_lock(&pool->lock);
  218. if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
  219. spawn_thread(pool);
  220. }
  221. QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
  222. qemu_mutex_unlock(&pool->lock);
  223. qemu_cond_signal(&pool->request_cond);
  224. return &req->common;
  225. }
  226. typedef struct ThreadPoolCo {
  227. Coroutine *co;
  228. int ret;
  229. } ThreadPoolCo;
  230. static void thread_pool_co_cb(void *opaque, int ret)
  231. {
  232. ThreadPoolCo *co = opaque;
  233. co->ret = ret;
  234. aio_co_wake(co->co);
  235. }
  236. int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
  237. void *arg)
  238. {
  239. ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
  240. assert(qemu_in_coroutine());
  241. thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
  242. qemu_coroutine_yield();
  243. return tpc.ret;
  244. }
  245. void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
  246. {
  247. thread_pool_submit_aio(pool, func, arg, NULL, NULL);
  248. }
  249. void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
  250. {
  251. qemu_mutex_lock(&pool->lock);
  252. pool->min_threads = ctx->thread_pool_min;
  253. pool->max_threads = ctx->thread_pool_max;
  254. /*
  255. * We either have to:
  256. * - Increase the number available of threads until over the min_threads
  257. * threshold.
  258. * - Bump the worker threads so that they exit, until under the max_threads
  259. * threshold.
  260. * - Do nothing. The current number of threads fall in between the min and
  261. * max thresholds. We'll let the pool manage itself.
  262. */
  263. for (int i = pool->cur_threads; i < pool->min_threads; i++) {
  264. spawn_thread(pool);
  265. }
  266. for (int i = pool->cur_threads; i > pool->max_threads; i--) {
  267. qemu_cond_signal(&pool->request_cond);
  268. }
  269. qemu_mutex_unlock(&pool->lock);
  270. }
  271. static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
  272. {
  273. if (!ctx) {
  274. ctx = qemu_get_aio_context();
  275. }
  276. memset(pool, 0, sizeof(*pool));
  277. pool->ctx = ctx;
  278. pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
  279. qemu_mutex_init(&pool->lock);
  280. qemu_cond_init(&pool->worker_stopped);
  281. qemu_cond_init(&pool->request_cond);
  282. pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
  283. QLIST_INIT(&pool->head);
  284. QTAILQ_INIT(&pool->request_list);
  285. thread_pool_update_params(pool, ctx);
  286. }
  287. ThreadPool *thread_pool_new(AioContext *ctx)
  288. {
  289. ThreadPool *pool = g_new(ThreadPool, 1);
  290. thread_pool_init_one(pool, ctx);
  291. return pool;
  292. }
  293. void thread_pool_free(ThreadPool *pool)
  294. {
  295. if (!pool) {
  296. return;
  297. }
  298. assert(QLIST_EMPTY(&pool->head));
  299. qemu_mutex_lock(&pool->lock);
  300. /* Stop new threads from spawning */
  301. qemu_bh_delete(pool->new_thread_bh);
  302. pool->cur_threads -= pool->new_threads;
  303. pool->new_threads = 0;
  304. /* Wait for worker threads to terminate */
  305. pool->max_threads = 0;
  306. qemu_cond_broadcast(&pool->request_cond);
  307. while (pool->cur_threads > 0) {
  308. qemu_cond_wait(&pool->worker_stopped, &pool->lock);
  309. }
  310. qemu_mutex_unlock(&pool->lock);
  311. qemu_bh_delete(pool->completion_bh);
  312. qemu_cond_destroy(&pool->request_cond);
  313. qemu_cond_destroy(&pool->worker_stopped);
  314. qemu_mutex_destroy(&pool->lock);
  315. g_free(pool);
  316. }