2
0

fdmon-io_uring.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /* SPDX-License-Identifier: GPL-2.0-or-later */
  2. /*
  3. * Linux io_uring file descriptor monitoring
  4. *
  5. * The Linux io_uring API supports file descriptor monitoring with a few
  6. * advantages over existing APIs like poll(2) and epoll(7):
  7. *
  8. * 1. Userspace polling of events is possible because the completion queue (cq
  9. * ring) is shared between the kernel and userspace. This allows
  10. * applications that rely on userspace polling to also monitor file
  11. * descriptors in the same userspace polling loop.
  12. *
  13. * 2. Submission and completion is batched and done together in a single system
  14. * call. This minimizes the number of system calls.
  15. *
  16. * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
  17. * poll(2).
  18. *
  19. * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
  20. * epoll(7).
  21. *
  22. * This code only monitors file descriptors and does not do asynchronous disk
  23. * I/O. Implementing disk I/O efficiently has other requirements and should
  24. * use a separate io_uring so it does not make sense to unify the code.
  25. *
  26. * File descriptor monitoring is implemented using the following operations:
  27. *
  28. * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
  29. * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When
  30. * the poll mask changes for a file descriptor it is first removed and then
  31. * re-added with the new poll mask, so this operation is also used as part
  32. * of modifying an existing monitored file descriptor.
  33. * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
  34. * for events. This operation self-cancels if another event completes
  35. * before the timeout.
  36. *
  37. * io_uring calls the submission queue the "sq ring" and the completion queue
  38. * the "cq ring". Ring entries are called "sqe" and "cqe", respectively.
  39. *
  40. * The code is structured so that sq/cq rings are only modified within
  41. * fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on
  42. * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
  43. * and/or IORING_OP_POLL_REMOVE sqes for them.
  44. */
  45. #include "qemu/osdep.h"
  46. #include <poll.h>
  47. #include "qemu/rcu_queue.h"
  48. #include "aio-posix.h"
  49. enum {
  50. FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */
  51. /* AioHandler::flags */
  52. FDMON_IO_URING_PENDING = (1 << 0),
  53. FDMON_IO_URING_ADD = (1 << 1),
  54. FDMON_IO_URING_REMOVE = (1 << 2),
  55. };
  56. static inline int poll_events_from_pfd(int pfd_events)
  57. {
  58. return (pfd_events & G_IO_IN ? POLLIN : 0) |
  59. (pfd_events & G_IO_OUT ? POLLOUT : 0) |
  60. (pfd_events & G_IO_HUP ? POLLHUP : 0) |
  61. (pfd_events & G_IO_ERR ? POLLERR : 0);
  62. }
  63. static inline int pfd_events_from_poll(int poll_events)
  64. {
  65. return (poll_events & POLLIN ? G_IO_IN : 0) |
  66. (poll_events & POLLOUT ? G_IO_OUT : 0) |
  67. (poll_events & POLLHUP ? G_IO_HUP : 0) |
  68. (poll_events & POLLERR ? G_IO_ERR : 0);
  69. }
  70. /*
  71. * Returns an sqe for submitting a request. Only be called within
  72. * fdmon_io_uring_wait().
  73. */
  74. static struct io_uring_sqe *get_sqe(AioContext *ctx)
  75. {
  76. struct io_uring *ring = &ctx->fdmon_io_uring;
  77. struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  78. int ret;
  79. if (likely(sqe)) {
  80. return sqe;
  81. }
  82. /* No free sqes left, submit pending sqes first */
  83. do {
  84. ret = io_uring_submit(ring);
  85. } while (ret == -EINTR);
  86. assert(ret > 1);
  87. sqe = io_uring_get_sqe(ring);
  88. assert(sqe);
  89. return sqe;
  90. }
  91. /* Atomically enqueue an AioHandler for sq ring submission */
  92. static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
  93. {
  94. unsigned old_flags;
  95. old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
  96. if (!(old_flags & FDMON_IO_URING_PENDING)) {
  97. QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
  98. }
  99. }
  100. /* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */
  101. static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
  102. {
  103. AioHandler *node = QSLIST_FIRST(head);
  104. if (!node) {
  105. return NULL;
  106. }
  107. /* Doesn't need to be atomic since fill_sq_ring() moves the list */
  108. QSLIST_REMOVE_HEAD(head, node_submitted);
  109. /*
  110. * Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two
  111. * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
  112. * telling process_cqe() to delete the AioHandler when its
  113. * IORING_OP_POLL_ADD completes.
  114. */
  115. *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
  116. FDMON_IO_URING_ADD));
  117. return node;
  118. }
  119. static void fdmon_io_uring_update(AioContext *ctx,
  120. AioHandler *old_node,
  121. AioHandler *new_node)
  122. {
  123. if (new_node) {
  124. enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
  125. }
  126. if (old_node) {
  127. /*
  128. * Deletion is tricky because IORING_OP_POLL_ADD and
  129. * IORING_OP_POLL_REMOVE are async. We need to wait for the original
  130. * IORING_OP_POLL_ADD to complete before this handler can be freed
  131. * safely.
  132. *
  133. * It's possible that the file descriptor becomes ready and the
  134. * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
  135. * submitted, too.
  136. *
  137. * Mark this handler deleted right now but don't place it on
  138. * ctx->deleted_aio_handlers yet. Instead, manually fudge the list
  139. * entry to make QLIST_IS_INSERTED() think this handler has been
  140. * inserted and other code recognizes this AioHandler as deleted.
  141. *
  142. * Once the original IORING_OP_POLL_ADD completes we enqueue the
  143. * handler on the real ctx->deleted_aio_handlers list to be freed.
  144. */
  145. assert(!QLIST_IS_INSERTED(old_node, node_deleted));
  146. old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
  147. enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
  148. }
  149. }
  150. static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
  151. {
  152. struct io_uring_sqe *sqe = get_sqe(ctx);
  153. int events = poll_events_from_pfd(node->pfd.events);
  154. io_uring_prep_poll_add(sqe, node->pfd.fd, events);
  155. io_uring_sqe_set_data(sqe, node);
  156. }
  157. static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
  158. {
  159. struct io_uring_sqe *sqe = get_sqe(ctx);
  160. #ifdef LIBURING_HAVE_DATA64
  161. io_uring_prep_poll_remove(sqe, (__u64)(uintptr_t)node);
  162. #else
  163. io_uring_prep_poll_remove(sqe, node);
  164. #endif
  165. }
  166. /* Add a timeout that self-cancels when another cqe becomes ready */
  167. static void add_timeout_sqe(AioContext *ctx, int64_t ns)
  168. {
  169. struct io_uring_sqe *sqe;
  170. struct __kernel_timespec ts = {
  171. .tv_sec = ns / NANOSECONDS_PER_SECOND,
  172. .tv_nsec = ns % NANOSECONDS_PER_SECOND,
  173. };
  174. sqe = get_sqe(ctx);
  175. io_uring_prep_timeout(sqe, &ts, 1, 0);
  176. }
  177. /* Add sqes from ctx->submit_list for submission */
  178. static void fill_sq_ring(AioContext *ctx)
  179. {
  180. AioHandlerSList submit_list;
  181. AioHandler *node;
  182. unsigned flags;
  183. QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
  184. while ((node = dequeue(&submit_list, &flags))) {
  185. /* Order matters, just in case both flags were set */
  186. if (flags & FDMON_IO_URING_ADD) {
  187. add_poll_add_sqe(ctx, node);
  188. }
  189. if (flags & FDMON_IO_URING_REMOVE) {
  190. add_poll_remove_sqe(ctx, node);
  191. }
  192. }
  193. }
  194. /* Returns true if a handler became ready */
  195. static bool process_cqe(AioContext *ctx,
  196. AioHandlerList *ready_list,
  197. struct io_uring_cqe *cqe)
  198. {
  199. AioHandler *node = io_uring_cqe_get_data(cqe);
  200. unsigned flags;
  201. /* poll_timeout and poll_remove have a zero user_data field */
  202. if (!node) {
  203. return false;
  204. }
  205. /*
  206. * Deletion can only happen when IORING_OP_POLL_ADD completes. If we race
  207. * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
  208. * bit before IORING_OP_POLL_REMOVE is submitted.
  209. */
  210. flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
  211. if (flags & FDMON_IO_URING_REMOVE) {
  212. QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
  213. return false;
  214. }
  215. aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
  216. /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
  217. add_poll_add_sqe(ctx, node);
  218. return true;
  219. }
  220. static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
  221. {
  222. struct io_uring *ring = &ctx->fdmon_io_uring;
  223. struct io_uring_cqe *cqe;
  224. unsigned num_cqes = 0;
  225. unsigned num_ready = 0;
  226. unsigned head;
  227. io_uring_for_each_cqe(ring, head, cqe) {
  228. if (process_cqe(ctx, ready_list, cqe)) {
  229. num_ready++;
  230. }
  231. num_cqes++;
  232. }
  233. io_uring_cq_advance(ring, num_cqes);
  234. return num_ready;
  235. }
  236. static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
  237. int64_t timeout)
  238. {
  239. unsigned wait_nr = 1; /* block until at least one cqe is ready */
  240. int ret;
  241. /* Fall back while external clients are disabled */
  242. if (qatomic_read(&ctx->external_disable_cnt)) {
  243. return fdmon_poll_ops.wait(ctx, ready_list, timeout);
  244. }
  245. if (timeout == 0) {
  246. wait_nr = 0; /* non-blocking */
  247. } else if (timeout > 0) {
  248. add_timeout_sqe(ctx, timeout);
  249. }
  250. fill_sq_ring(ctx);
  251. do {
  252. ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
  253. } while (ret == -EINTR);
  254. assert(ret >= 0);
  255. return process_cq_ring(ctx, ready_list);
  256. }
  257. static bool fdmon_io_uring_need_wait(AioContext *ctx)
  258. {
  259. /* Have io_uring events completed? */
  260. if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
  261. return true;
  262. }
  263. /* Are there pending sqes to submit? */
  264. if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
  265. return true;
  266. }
  267. /* Do we need to process AioHandlers for io_uring changes? */
  268. if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
  269. return true;
  270. }
  271. /* Are we falling back to fdmon-poll? */
  272. return qatomic_read(&ctx->external_disable_cnt);
  273. }
  274. static const FDMonOps fdmon_io_uring_ops = {
  275. .update = fdmon_io_uring_update,
  276. .wait = fdmon_io_uring_wait,
  277. .need_wait = fdmon_io_uring_need_wait,
  278. };
  279. bool fdmon_io_uring_setup(AioContext *ctx)
  280. {
  281. int ret;
  282. ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
  283. if (ret != 0) {
  284. return false;
  285. }
  286. QSLIST_INIT(&ctx->submit_list);
  287. ctx->fdmon_ops = &fdmon_io_uring_ops;
  288. return true;
  289. }
  290. void fdmon_io_uring_destroy(AioContext *ctx)
  291. {
  292. if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
  293. AioHandler *node;
  294. io_uring_queue_exit(&ctx->fdmon_io_uring);
  295. /* Move handlers due to be removed onto the deleted list */
  296. while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
  297. unsigned flags = qatomic_fetch_and(&node->flags,
  298. ~(FDMON_IO_URING_PENDING |
  299. FDMON_IO_URING_ADD |
  300. FDMON_IO_URING_REMOVE));
  301. if (flags & FDMON_IO_URING_REMOVE) {
  302. QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
  303. }
  304. QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
  305. }
  306. ctx->fdmon_ops = &fdmon_poll_ops;
  307. }
  308. }