io_uring.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. /*
  2. * Linux io_uring support.
  3. *
  4. * Copyright (C) 2009 IBM, Corp.
  5. * Copyright (C) 2009 Red Hat, Inc.
  6. * Copyright (C) 2019 Aarushi Mehta
  7. *
  8. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  9. * See the COPYING file in the top-level directory.
  10. */
  11. #include "qemu/osdep.h"
  12. #include <liburing.h>
  13. #include "block/aio.h"
  14. #include "qemu/queue.h"
  15. #include "block/block.h"
  16. #include "block/raw-aio.h"
  17. #include "qemu/coroutine.h"
  18. #include "qemu/defer-call.h"
  19. #include "qapi/error.h"
  20. #include "system/block-backend.h"
  21. #include "trace.h"
  22. /* Only used for assertions. */
  23. #include "qemu/coroutine_int.h"
  24. /* io_uring ring size */
  25. #define MAX_ENTRIES 128
  26. typedef struct LuringAIOCB {
  27. Coroutine *co;
  28. struct io_uring_sqe sqeq;
  29. ssize_t ret;
  30. QEMUIOVector *qiov;
  31. bool is_read;
  32. QSIMPLEQ_ENTRY(LuringAIOCB) next;
  33. /*
  34. * Buffered reads may require resubmission, see
  35. * luring_resubmit_short_read().
  36. */
  37. int total_read;
  38. QEMUIOVector resubmit_qiov;
  39. } LuringAIOCB;
  40. typedef struct LuringQueue {
  41. unsigned int in_queue;
  42. unsigned int in_flight;
  43. bool blocked;
  44. QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
  45. } LuringQueue;
  46. struct LuringState {
  47. AioContext *aio_context;
  48. struct io_uring ring;
  49. /* No locking required, only accessed from AioContext home thread */
  50. LuringQueue io_q;
  51. QEMUBH *completion_bh;
  52. };
  53. /**
  54. * luring_resubmit:
  55. *
  56. * Resubmit a request by appending it to submit_queue. The caller must ensure
  57. * that ioq_submit() is called later so that submit_queue requests are started.
  58. */
  59. static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
  60. {
  61. QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
  62. s->io_q.in_queue++;
  63. }
  64. /**
  65. * luring_resubmit_short_read:
  66. *
  67. * Short reads are rare but may occur. The remaining read request needs to be
  68. * resubmitted.
  69. */
  70. static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
  71. int nread)
  72. {
  73. QEMUIOVector *resubmit_qiov;
  74. size_t remaining;
  75. trace_luring_resubmit_short_read(s, luringcb, nread);
  76. /* Update read position */
  77. luringcb->total_read += nread;
  78. remaining = luringcb->qiov->size - luringcb->total_read;
  79. /* Shorten qiov */
  80. resubmit_qiov = &luringcb->resubmit_qiov;
  81. if (resubmit_qiov->iov == NULL) {
  82. qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
  83. } else {
  84. qemu_iovec_reset(resubmit_qiov);
  85. }
  86. qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
  87. remaining);
  88. /* Update sqe */
  89. luringcb->sqeq.off += nread;
  90. luringcb->sqeq.addr = (uintptr_t)luringcb->resubmit_qiov.iov;
  91. luringcb->sqeq.len = luringcb->resubmit_qiov.niov;
  92. luring_resubmit(s, luringcb);
  93. }
  94. /**
  95. * luring_process_completions:
  96. * @s: AIO state
  97. *
  98. * Fetches completed I/O requests, consumes cqes and invokes their callbacks
  99. * The function is somewhat tricky because it supports nested event loops, for
  100. * example when a request callback invokes aio_poll().
  101. *
  102. * Function schedules BH completion so it can be called again in a nested
  103. * event loop. When there are no events left to complete the BH is being
  104. * canceled.
  105. *
  106. */
  107. static void luring_process_completions(LuringState *s)
  108. {
  109. struct io_uring_cqe *cqes;
  110. int total_bytes;
  111. defer_call_begin();
  112. /*
  113. * Request completion callbacks can run the nested event loop.
  114. * Schedule ourselves so the nested event loop will "see" remaining
  115. * completed requests and process them. Without this, completion
  116. * callbacks that wait for other requests using a nested event loop
  117. * would hang forever.
  118. *
  119. * This workaround is needed because io_uring uses poll_wait, which
  120. * is woken up when new events are added to the uring, thus polling on
  121. * the same uring fd will block unless more events are received.
  122. *
  123. * Other leaf block drivers (drivers that access the data themselves)
  124. * are networking based, so they poll sockets for data and run the
  125. * correct coroutine.
  126. */
  127. qemu_bh_schedule(s->completion_bh);
  128. while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
  129. LuringAIOCB *luringcb;
  130. int ret;
  131. if (!cqes) {
  132. break;
  133. }
  134. luringcb = io_uring_cqe_get_data(cqes);
  135. ret = cqes->res;
  136. io_uring_cqe_seen(&s->ring, cqes);
  137. cqes = NULL;
  138. /* Change counters one-by-one because we can be nested. */
  139. s->io_q.in_flight--;
  140. trace_luring_process_completion(s, luringcb, ret);
  141. /* total_read is non-zero only for resubmitted read requests */
  142. total_bytes = ret + luringcb->total_read;
  143. if (ret < 0) {
  144. /*
  145. * Only writev/readv/fsync requests on regular files or host block
  146. * devices are submitted. Therefore -EAGAIN is not expected but it's
  147. * known to happen sometimes with Linux SCSI. Submit again and hope
  148. * the request completes successfully.
  149. *
  150. * For more information, see:
  151. * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u
  152. *
  153. * If the code is changed to submit other types of requests in the
  154. * future, then this workaround may need to be extended to deal with
  155. * genuine -EAGAIN results that should not be resubmitted
  156. * immediately.
  157. */
  158. if (ret == -EINTR || ret == -EAGAIN) {
  159. luring_resubmit(s, luringcb);
  160. continue;
  161. }
  162. } else if (!luringcb->qiov) {
  163. goto end;
  164. } else if (total_bytes == luringcb->qiov->size) {
  165. ret = 0;
  166. /* Only read/write */
  167. } else {
  168. /* Short Read/Write */
  169. if (luringcb->is_read) {
  170. if (ret > 0) {
  171. luring_resubmit_short_read(s, luringcb, ret);
  172. continue;
  173. } else {
  174. /* Pad with zeroes */
  175. qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
  176. luringcb->qiov->size - total_bytes);
  177. ret = 0;
  178. }
  179. } else {
  180. ret = -ENOSPC;
  181. }
  182. }
  183. end:
  184. luringcb->ret = ret;
  185. qemu_iovec_destroy(&luringcb->resubmit_qiov);
  186. /*
  187. * If the coroutine is already entered it must be in ioq_submit()
  188. * and will notice luringcb->ret has been filled in when it
  189. * eventually runs later. Coroutines cannot be entered recursively
  190. * so avoid doing that!
  191. */
  192. assert(luringcb->co->ctx == s->aio_context);
  193. if (!qemu_coroutine_entered(luringcb->co)) {
  194. aio_co_wake(luringcb->co);
  195. }
  196. }
  197. qemu_bh_cancel(s->completion_bh);
  198. defer_call_end();
  199. }
  200. static int ioq_submit(LuringState *s)
  201. {
  202. int ret = 0;
  203. LuringAIOCB *luringcb, *luringcb_next;
  204. while (s->io_q.in_queue > 0) {
  205. /*
  206. * Try to fetch sqes from the ring for requests waiting in
  207. * the overflow queue
  208. */
  209. QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
  210. luringcb_next) {
  211. struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
  212. if (!sqes) {
  213. break;
  214. }
  215. /* Prep sqe for submission */
  216. *sqes = luringcb->sqeq;
  217. QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
  218. }
  219. ret = io_uring_submit(&s->ring);
  220. trace_luring_io_uring_submit(s, ret);
  221. /* Prevent infinite loop if submission is refused */
  222. if (ret <= 0) {
  223. if (ret == -EAGAIN || ret == -EINTR) {
  224. continue;
  225. }
  226. break;
  227. }
  228. s->io_q.in_flight += ret;
  229. s->io_q.in_queue -= ret;
  230. }
  231. s->io_q.blocked = (s->io_q.in_queue > 0);
  232. if (s->io_q.in_flight) {
  233. /*
  234. * We can try to complete something just right away if there are
  235. * still requests in-flight.
  236. */
  237. luring_process_completions(s);
  238. }
  239. return ret;
  240. }
  241. static void luring_process_completions_and_submit(LuringState *s)
  242. {
  243. luring_process_completions(s);
  244. if (s->io_q.in_queue > 0) {
  245. ioq_submit(s);
  246. }
  247. }
  248. static void qemu_luring_completion_bh(void *opaque)
  249. {
  250. LuringState *s = opaque;
  251. luring_process_completions_and_submit(s);
  252. }
  253. static void qemu_luring_completion_cb(void *opaque)
  254. {
  255. LuringState *s = opaque;
  256. luring_process_completions_and_submit(s);
  257. }
  258. static bool qemu_luring_poll_cb(void *opaque)
  259. {
  260. LuringState *s = opaque;
  261. return io_uring_cq_ready(&s->ring);
  262. }
  263. static void qemu_luring_poll_ready(void *opaque)
  264. {
  265. LuringState *s = opaque;
  266. luring_process_completions_and_submit(s);
  267. }
  268. static void ioq_init(LuringQueue *io_q)
  269. {
  270. QSIMPLEQ_INIT(&io_q->submit_queue);
  271. io_q->in_queue = 0;
  272. io_q->in_flight = 0;
  273. io_q->blocked = false;
  274. }
  275. static void luring_deferred_fn(void *opaque)
  276. {
  277. LuringState *s = opaque;
  278. trace_luring_unplug_fn(s, s->io_q.blocked, s->io_q.in_queue,
  279. s->io_q.in_flight);
  280. if (!s->io_q.blocked && s->io_q.in_queue > 0) {
  281. ioq_submit(s);
  282. }
  283. }
  284. /**
  285. * luring_do_submit:
  286. * @fd: file descriptor for I/O
  287. * @luringcb: AIO control block
  288. * @s: AIO state
  289. * @offset: offset for request
  290. * @type: type of request
  291. *
  292. * Fetches sqes from ring, adds to pending queue and preps them
  293. *
  294. */
  295. static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
  296. uint64_t offset, int type)
  297. {
  298. int ret;
  299. struct io_uring_sqe *sqes = &luringcb->sqeq;
  300. switch (type) {
  301. case QEMU_AIO_WRITE:
  302. io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
  303. luringcb->qiov->niov, offset);
  304. break;
  305. case QEMU_AIO_ZONE_APPEND:
  306. io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
  307. luringcb->qiov->niov, offset);
  308. break;
  309. case QEMU_AIO_READ:
  310. io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
  311. luringcb->qiov->niov, offset);
  312. break;
  313. case QEMU_AIO_FLUSH:
  314. io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
  315. break;
  316. default:
  317. fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
  318. __func__, type);
  319. abort();
  320. }
  321. io_uring_sqe_set_data(sqes, luringcb);
  322. QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
  323. s->io_q.in_queue++;
  324. trace_luring_do_submit(s, s->io_q.blocked, s->io_q.in_queue,
  325. s->io_q.in_flight);
  326. if (!s->io_q.blocked) {
  327. if (s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES) {
  328. ret = ioq_submit(s);
  329. trace_luring_do_submit_done(s, ret);
  330. return ret;
  331. }
  332. defer_call(luring_deferred_fn, s);
  333. }
  334. return 0;
  335. }
  336. int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset,
  337. QEMUIOVector *qiov, int type)
  338. {
  339. int ret;
  340. AioContext *ctx = qemu_get_current_aio_context();
  341. LuringState *s = aio_get_linux_io_uring(ctx);
  342. LuringAIOCB luringcb = {
  343. .co = qemu_coroutine_self(),
  344. .ret = -EINPROGRESS,
  345. .qiov = qiov,
  346. .is_read = (type == QEMU_AIO_READ),
  347. };
  348. trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
  349. type);
  350. ret = luring_do_submit(fd, &luringcb, s, offset, type);
  351. if (ret < 0) {
  352. return ret;
  353. }
  354. if (luringcb.ret == -EINPROGRESS) {
  355. qemu_coroutine_yield();
  356. }
  357. return luringcb.ret;
  358. }
  359. void luring_detach_aio_context(LuringState *s, AioContext *old_context)
  360. {
  361. aio_set_fd_handler(old_context, s->ring.ring_fd,
  362. NULL, NULL, NULL, NULL, s);
  363. qemu_bh_delete(s->completion_bh);
  364. s->aio_context = NULL;
  365. }
  366. void luring_attach_aio_context(LuringState *s, AioContext *new_context)
  367. {
  368. s->aio_context = new_context;
  369. s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
  370. aio_set_fd_handler(s->aio_context, s->ring.ring_fd,
  371. qemu_luring_completion_cb, NULL,
  372. qemu_luring_poll_cb, qemu_luring_poll_ready, s);
  373. }
  374. LuringState *luring_init(Error **errp)
  375. {
  376. int rc;
  377. LuringState *s = g_new0(LuringState, 1);
  378. struct io_uring *ring = &s->ring;
  379. trace_luring_init_state(s, sizeof(*s));
  380. rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
  381. if (rc < 0) {
  382. error_setg_errno(errp, -rc, "failed to init linux io_uring ring");
  383. g_free(s);
  384. return NULL;
  385. }
  386. ioq_init(&s->io_q);
  387. return s;
  388. }
  389. void luring_cleanup(LuringState *s)
  390. {
  391. io_uring_queue_exit(&s->ring);
  392. trace_luring_cleanup_state(s);
  393. g_free(s);
  394. }