async.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. /*
  2. * Data plane event loop
  3. *
  4. * Copyright (c) 2003-2008 Fabrice Bellard
  5. * Copyright (c) 2009-2017 QEMU contributors
  6. *
  7. * Permission is hereby granted, free of charge, to any person obtaining a copy
  8. * of this software and associated documentation files (the "Software"), to deal
  9. * in the Software without restriction, including without limitation the rights
  10. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11. * copies of the Software, and to permit persons to whom the Software is
  12. * furnished to do so, subject to the following conditions:
  13. *
  14. * The above copyright notice and this permission notice shall be included in
  15. * all copies or substantial portions of the Software.
  16. *
  17. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23. * THE SOFTWARE.
  24. */
  25. #include "qemu/osdep.h"
  26. #include "qapi/error.h"
  27. #include "block/aio.h"
  28. #include "block/thread-pool.h"
  29. #include "qemu/main-loop.h"
  30. #include "qemu/atomic.h"
  31. #include "block/raw-aio.h"
  32. #include "qemu/coroutine_int.h"
  33. #include "trace.h"
  34. /***********************************************************/
  35. /* bottom halves (can be seen as timers which expire ASAP) */
  36. struct QEMUBH {
  37. AioContext *ctx;
  38. QEMUBHFunc *cb;
  39. void *opaque;
  40. QEMUBH *next;
  41. bool scheduled;
  42. bool idle;
  43. bool deleted;
  44. };
  45. void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  46. {
  47. QEMUBH *bh;
  48. bh = g_new(QEMUBH, 1);
  49. *bh = (QEMUBH){
  50. .ctx = ctx,
  51. .cb = cb,
  52. .opaque = opaque,
  53. };
  54. qemu_lockcnt_lock(&ctx->list_lock);
  55. bh->next = ctx->first_bh;
  56. bh->scheduled = 1;
  57. bh->deleted = 1;
  58. /* Make sure that the members are ready before putting bh into list */
  59. smp_wmb();
  60. ctx->first_bh = bh;
  61. qemu_lockcnt_unlock(&ctx->list_lock);
  62. aio_notify(ctx);
  63. }
  64. QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
  65. {
  66. QEMUBH *bh;
  67. bh = g_new(QEMUBH, 1);
  68. *bh = (QEMUBH){
  69. .ctx = ctx,
  70. .cb = cb,
  71. .opaque = opaque,
  72. };
  73. qemu_lockcnt_lock(&ctx->list_lock);
  74. bh->next = ctx->first_bh;
  75. /* Make sure that the members are ready before putting bh into list */
  76. smp_wmb();
  77. ctx->first_bh = bh;
  78. qemu_lockcnt_unlock(&ctx->list_lock);
  79. return bh;
  80. }
  81. void aio_bh_call(QEMUBH *bh)
  82. {
  83. bh->cb(bh->opaque);
  84. }
  85. /* Multiple occurrences of aio_bh_poll cannot be called concurrently.
  86. * The count in ctx->list_lock is incremented before the call, and is
  87. * not affected by the call.
  88. */
  89. int aio_bh_poll(AioContext *ctx)
  90. {
  91. QEMUBH *bh, **bhp, *next;
  92. int ret;
  93. bool deleted = false;
  94. ret = 0;
  95. for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
  96. next = atomic_rcu_read(&bh->next);
  97. /* The atomic_xchg is paired with the one in qemu_bh_schedule. The
  98. * implicit memory barrier ensures that the callback sees all writes
  99. * done by the scheduling thread. It also ensures that the scheduling
  100. * thread sees the zero before bh->cb has run, and thus will call
  101. * aio_notify again if necessary.
  102. */
  103. if (atomic_xchg(&bh->scheduled, 0)) {
  104. /* Idle BHs don't count as progress */
  105. if (!bh->idle) {
  106. ret = 1;
  107. }
  108. bh->idle = 0;
  109. aio_bh_call(bh);
  110. }
  111. if (bh->deleted) {
  112. deleted = true;
  113. }
  114. }
  115. /* remove deleted bhs */
  116. if (!deleted) {
  117. return ret;
  118. }
  119. if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
  120. bhp = &ctx->first_bh;
  121. while (*bhp) {
  122. bh = *bhp;
  123. if (bh->deleted && !bh->scheduled) {
  124. *bhp = bh->next;
  125. g_free(bh);
  126. } else {
  127. bhp = &bh->next;
  128. }
  129. }
  130. qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
  131. }
  132. return ret;
  133. }
  134. void qemu_bh_schedule_idle(QEMUBH *bh)
  135. {
  136. bh->idle = 1;
  137. /* Make sure that idle & any writes needed by the callback are done
  138. * before the locations are read in the aio_bh_poll.
  139. */
  140. atomic_mb_set(&bh->scheduled, 1);
  141. }
  142. void qemu_bh_schedule(QEMUBH *bh)
  143. {
  144. AioContext *ctx;
  145. ctx = bh->ctx;
  146. bh->idle = 0;
  147. /* The memory barrier implicit in atomic_xchg makes sure that:
  148. * 1. idle & any writes needed by the callback are done before the
  149. * locations are read in the aio_bh_poll.
  150. * 2. ctx is loaded before scheduled is set and the callback has a chance
  151. * to execute.
  152. */
  153. if (atomic_xchg(&bh->scheduled, 1) == 0) {
  154. aio_notify(ctx);
  155. }
  156. }
  157. /* This func is async.
  158. */
  159. void qemu_bh_cancel(QEMUBH *bh)
  160. {
  161. atomic_mb_set(&bh->scheduled, 0);
  162. }
  163. /* This func is async.The bottom half will do the delete action at the finial
  164. * end.
  165. */
  166. void qemu_bh_delete(QEMUBH *bh)
  167. {
  168. bh->scheduled = 0;
  169. bh->deleted = 1;
  170. }
  171. int64_t
  172. aio_compute_timeout(AioContext *ctx)
  173. {
  174. int64_t deadline;
  175. int timeout = -1;
  176. QEMUBH *bh;
  177. for (bh = atomic_rcu_read(&ctx->first_bh); bh;
  178. bh = atomic_rcu_read(&bh->next)) {
  179. if (bh->scheduled) {
  180. if (bh->idle) {
  181. /* idle bottom halves will be polled at least
  182. * every 10ms */
  183. timeout = 10000000;
  184. } else {
  185. /* non-idle bottom halves will be executed
  186. * immediately */
  187. return 0;
  188. }
  189. }
  190. }
  191. deadline = timerlistgroup_deadline_ns(&ctx->tlg);
  192. if (deadline == 0) {
  193. return 0;
  194. } else {
  195. return qemu_soonest_timeout(timeout, deadline);
  196. }
  197. }
  198. static gboolean
  199. aio_ctx_prepare(GSource *source, gint *timeout)
  200. {
  201. AioContext *ctx = (AioContext *) source;
  202. atomic_or(&ctx->notify_me, 1);
  203. /* We assume there is no timeout already supplied */
  204. *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
  205. if (aio_prepare(ctx)) {
  206. *timeout = 0;
  207. }
  208. return *timeout == 0;
  209. }
  210. static gboolean
  211. aio_ctx_check(GSource *source)
  212. {
  213. AioContext *ctx = (AioContext *) source;
  214. QEMUBH *bh;
  215. atomic_and(&ctx->notify_me, ~1);
  216. aio_notify_accept(ctx);
  217. for (bh = ctx->first_bh; bh; bh = bh->next) {
  218. if (bh->scheduled) {
  219. return true;
  220. }
  221. }
  222. return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
  223. }
  224. static gboolean
  225. aio_ctx_dispatch(GSource *source,
  226. GSourceFunc callback,
  227. gpointer user_data)
  228. {
  229. AioContext *ctx = (AioContext *) source;
  230. assert(callback == NULL);
  231. aio_dispatch(ctx);
  232. return true;
  233. }
  234. static void
  235. aio_ctx_finalize(GSource *source)
  236. {
  237. AioContext *ctx = (AioContext *) source;
  238. thread_pool_free(ctx->thread_pool);
  239. #ifdef CONFIG_LINUX_AIO
  240. if (ctx->linux_aio) {
  241. laio_detach_aio_context(ctx->linux_aio, ctx);
  242. laio_cleanup(ctx->linux_aio);
  243. ctx->linux_aio = NULL;
  244. }
  245. #endif
  246. assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
  247. qemu_bh_delete(ctx->co_schedule_bh);
  248. qemu_lockcnt_lock(&ctx->list_lock);
  249. assert(!qemu_lockcnt_count(&ctx->list_lock));
  250. while (ctx->first_bh) {
  251. QEMUBH *next = ctx->first_bh->next;
  252. /* qemu_bh_delete() must have been called on BHs in this AioContext */
  253. assert(ctx->first_bh->deleted);
  254. g_free(ctx->first_bh);
  255. ctx->first_bh = next;
  256. }
  257. qemu_lockcnt_unlock(&ctx->list_lock);
  258. aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
  259. event_notifier_cleanup(&ctx->notifier);
  260. qemu_rec_mutex_destroy(&ctx->lock);
  261. qemu_lockcnt_destroy(&ctx->list_lock);
  262. timerlistgroup_deinit(&ctx->tlg);
  263. aio_context_destroy(ctx);
  264. }
  265. static GSourceFuncs aio_source_funcs = {
  266. aio_ctx_prepare,
  267. aio_ctx_check,
  268. aio_ctx_dispatch,
  269. aio_ctx_finalize
  270. };
  271. GSource *aio_get_g_source(AioContext *ctx)
  272. {
  273. g_source_ref(&ctx->source);
  274. return &ctx->source;
  275. }
  276. ThreadPool *aio_get_thread_pool(AioContext *ctx)
  277. {
  278. if (!ctx->thread_pool) {
  279. ctx->thread_pool = thread_pool_new(ctx);
  280. }
  281. return ctx->thread_pool;
  282. }
  283. #ifdef CONFIG_LINUX_AIO
  284. LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp)
  285. {
  286. if (!ctx->linux_aio) {
  287. ctx->linux_aio = laio_init(errp);
  288. if (ctx->linux_aio) {
  289. laio_attach_aio_context(ctx->linux_aio, ctx);
  290. }
  291. }
  292. return ctx->linux_aio;
  293. }
  294. LinuxAioState *aio_get_linux_aio(AioContext *ctx)
  295. {
  296. assert(ctx->linux_aio);
  297. return ctx->linux_aio;
  298. }
  299. #endif
  300. void aio_notify(AioContext *ctx)
  301. {
  302. /* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs
  303. * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
  304. */
  305. smp_mb();
  306. if (ctx->notify_me) {
  307. event_notifier_set(&ctx->notifier);
  308. atomic_mb_set(&ctx->notified, true);
  309. }
  310. }
  311. void aio_notify_accept(AioContext *ctx)
  312. {
  313. if (atomic_xchg(&ctx->notified, false)
  314. #ifdef WIN32
  315. || true
  316. #endif
  317. ) {
  318. event_notifier_test_and_clear(&ctx->notifier);
  319. }
  320. }
  321. static void aio_timerlist_notify(void *opaque, QEMUClockType type)
  322. {
  323. aio_notify(opaque);
  324. }
  325. static void event_notifier_dummy_cb(EventNotifier *e)
  326. {
  327. }
  328. /* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
  329. static bool event_notifier_poll(void *opaque)
  330. {
  331. EventNotifier *e = opaque;
  332. AioContext *ctx = container_of(e, AioContext, notifier);
  333. return atomic_read(&ctx->notified);
  334. }
  335. static void co_schedule_bh_cb(void *opaque)
  336. {
  337. AioContext *ctx = opaque;
  338. QSLIST_HEAD(, Coroutine) straight, reversed;
  339. QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
  340. QSLIST_INIT(&straight);
  341. while (!QSLIST_EMPTY(&reversed)) {
  342. Coroutine *co = QSLIST_FIRST(&reversed);
  343. QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
  344. QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
  345. }
  346. while (!QSLIST_EMPTY(&straight)) {
  347. Coroutine *co = QSLIST_FIRST(&straight);
  348. QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
  349. trace_aio_co_schedule_bh_cb(ctx, co);
  350. aio_context_acquire(ctx);
  351. /* Protected by write barrier in qemu_aio_coroutine_enter */
  352. atomic_set(&co->scheduled, NULL);
  353. qemu_aio_coroutine_enter(ctx, co);
  354. aio_context_release(ctx);
  355. }
  356. }
  357. AioContext *aio_context_new(Error **errp)
  358. {
  359. int ret;
  360. AioContext *ctx;
  361. ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
  362. aio_context_setup(ctx);
  363. ret = event_notifier_init(&ctx->notifier, false);
  364. if (ret < 0) {
  365. error_setg_errno(errp, -ret, "Failed to initialize event notifier");
  366. goto fail;
  367. }
  368. g_source_set_can_recurse(&ctx->source, true);
  369. qemu_lockcnt_init(&ctx->list_lock);
  370. ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
  371. QSLIST_INIT(&ctx->scheduled_coroutines);
  372. aio_set_event_notifier(ctx, &ctx->notifier,
  373. false,
  374. event_notifier_dummy_cb,
  375. event_notifier_poll);
  376. #ifdef CONFIG_LINUX_AIO
  377. ctx->linux_aio = NULL;
  378. #endif
  379. ctx->thread_pool = NULL;
  380. qemu_rec_mutex_init(&ctx->lock);
  381. timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
  382. ctx->poll_ns = 0;
  383. ctx->poll_max_ns = 0;
  384. ctx->poll_grow = 0;
  385. ctx->poll_shrink = 0;
  386. return ctx;
  387. fail:
  388. g_source_destroy(&ctx->source);
  389. return NULL;
  390. }
  391. void aio_co_schedule(AioContext *ctx, Coroutine *co)
  392. {
  393. trace_aio_co_schedule(ctx, co);
  394. const char *scheduled = atomic_cmpxchg(&co->scheduled, NULL,
  395. __func__);
  396. if (scheduled) {
  397. fprintf(stderr,
  398. "%s: Co-routine was already scheduled in '%s'\n",
  399. __func__, scheduled);
  400. abort();
  401. }
  402. /* The coroutine might run and release the last ctx reference before we
  403. * invoke qemu_bh_schedule(). Take a reference to keep ctx alive until
  404. * we're done.
  405. */
  406. aio_context_ref(ctx);
  407. QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
  408. co, co_scheduled_next);
  409. qemu_bh_schedule(ctx->co_schedule_bh);
  410. aio_context_unref(ctx);
  411. }
  412. void aio_co_wake(struct Coroutine *co)
  413. {
  414. AioContext *ctx;
  415. /* Read coroutine before co->ctx. Matches smp_wmb in
  416. * qemu_coroutine_enter.
  417. */
  418. smp_read_barrier_depends();
  419. ctx = atomic_read(&co->ctx);
  420. aio_co_enter(ctx, co);
  421. }
  422. void aio_co_enter(AioContext *ctx, struct Coroutine *co)
  423. {
  424. if (ctx != qemu_get_current_aio_context()) {
  425. aio_co_schedule(ctx, co);
  426. return;
  427. }
  428. if (qemu_in_coroutine()) {
  429. Coroutine *self = qemu_coroutine_self();
  430. assert(self != co);
  431. QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
  432. } else {
  433. aio_context_acquire(ctx);
  434. qemu_aio_coroutine_enter(ctx, co);
  435. aio_context_release(ctx);
  436. }
  437. }
  438. void aio_context_ref(AioContext *ctx)
  439. {
  440. g_source_ref(&ctx->source);
  441. }
  442. void aio_context_unref(AioContext *ctx)
  443. {
  444. g_source_unref(&ctx->source);
  445. }
  446. void aio_context_acquire(AioContext *ctx)
  447. {
  448. qemu_rec_mutex_lock(&ctx->lock);
  449. }
  450. void aio_context_release(AioContext *ctx)
  451. {
  452. qemu_rec_mutex_unlock(&ctx->lock);
  453. }