2
0

linux-aio.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /*
  2. * Linux native AIO support.
  3. *
  4. * Copyright (C) 2009 IBM, Corp.
  5. * Copyright (C) 2009 Red Hat, Inc.
  6. *
  7. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  8. * See the COPYING file in the top-level directory.
  9. */
  10. #include "qemu-common.h"
  11. #include "qemu-aio.h"
  12. #include "block_int.h"
  13. #include "block/raw-posix-aio.h"
  14. #include <sys/eventfd.h>
  15. #include <libaio.h>
  16. /*
  17. * Queue size (per-device).
  18. *
  19. * XXX: eventually we need to communicate this to the guest and/or make it
  20. * tunable by the guest. If we get more outstanding requests at a time
  21. * than this we will get EAGAIN from io_submit which is communicated to
  22. * the guest as an I/O error.
  23. */
  24. #define MAX_EVENTS 128
  25. struct qemu_laiocb {
  26. BlockDriverAIOCB common;
  27. struct qemu_laio_state *ctx;
  28. struct iocb iocb;
  29. ssize_t ret;
  30. size_t nbytes;
  31. int async_context_id;
  32. QLIST_ENTRY(qemu_laiocb) node;
  33. };
  34. struct qemu_laio_state {
  35. io_context_t ctx;
  36. int efd;
  37. int count;
  38. QLIST_HEAD(, qemu_laiocb) completed_reqs;
  39. };
  40. static inline ssize_t io_event_ret(struct io_event *ev)
  41. {
  42. return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
  43. }
  44. /*
  45. * Completes an AIO request (calls the callback and frees the ACB).
  46. * Be sure to be in the right AsyncContext before calling this function.
  47. */
  48. static void qemu_laio_process_completion(struct qemu_laio_state *s,
  49. struct qemu_laiocb *laiocb)
  50. {
  51. int ret;
  52. s->count--;
  53. ret = laiocb->ret;
  54. if (ret != -ECANCELED) {
  55. if (ret == laiocb->nbytes)
  56. ret = 0;
  57. else if (ret >= 0)
  58. ret = -EINVAL;
  59. laiocb->common.cb(laiocb->common.opaque, ret);
  60. }
  61. qemu_aio_release(laiocb);
  62. }
  63. /*
  64. * Processes all queued AIO requests, i.e. requests that have return from OS
  65. * but their callback was not called yet. Requests that cannot have their
  66. * callback called in the current AsyncContext, remain in the queue.
  67. *
  68. * Returns 1 if at least one request could be completed, 0 otherwise.
  69. */
  70. static int qemu_laio_process_requests(void *opaque)
  71. {
  72. struct qemu_laio_state *s = opaque;
  73. struct qemu_laiocb *laiocb, *next;
  74. int res = 0;
  75. QLIST_FOREACH_SAFE (laiocb, &s->completed_reqs, node, next) {
  76. if (laiocb->async_context_id == get_async_context_id()) {
  77. qemu_laio_process_completion(s, laiocb);
  78. QLIST_REMOVE(laiocb, node);
  79. res = 1;
  80. }
  81. }
  82. return res;
  83. }
  84. /*
  85. * Puts a request in the completion queue so that its callback is called the
  86. * next time when it's possible. If we already are in the right AsyncContext,
  87. * the request is completed immediately instead.
  88. */
  89. static void qemu_laio_enqueue_completed(struct qemu_laio_state *s,
  90. struct qemu_laiocb* laiocb)
  91. {
  92. if (laiocb->async_context_id == get_async_context_id()) {
  93. qemu_laio_process_completion(s, laiocb);
  94. } else {
  95. QLIST_INSERT_HEAD(&s->completed_reqs, laiocb, node);
  96. }
  97. }
  98. static void qemu_laio_completion_cb(void *opaque)
  99. {
  100. struct qemu_laio_state *s = opaque;
  101. while (1) {
  102. struct io_event events[MAX_EVENTS];
  103. uint64_t val;
  104. ssize_t ret;
  105. struct timespec ts = { 0 };
  106. int nevents, i;
  107. do {
  108. ret = read(s->efd, &val, sizeof(val));
  109. } while (ret == -1 && errno == EINTR);
  110. if (ret == -1 && errno == EAGAIN)
  111. break;
  112. if (ret != 8)
  113. break;
  114. do {
  115. nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
  116. } while (nevents == -EINTR);
  117. for (i = 0; i < nevents; i++) {
  118. struct iocb *iocb = events[i].obj;
  119. struct qemu_laiocb *laiocb =
  120. container_of(iocb, struct qemu_laiocb, iocb);
  121. laiocb->ret = io_event_ret(&events[i]);
  122. qemu_laio_enqueue_completed(s, laiocb);
  123. }
  124. }
  125. }
  126. static int qemu_laio_flush_cb(void *opaque)
  127. {
  128. struct qemu_laio_state *s = opaque;
  129. return (s->count > 0) ? 1 : 0;
  130. }
  131. static void laio_cancel(BlockDriverAIOCB *blockacb)
  132. {
  133. struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
  134. struct io_event event;
  135. int ret;
  136. if (laiocb->ret != -EINPROGRESS)
  137. return;
  138. /*
  139. * Note that as of Linux 2.6.31 neither the block device code nor any
  140. * filesystem implements cancellation of AIO request.
  141. * Thus the polling loop below is the normal code path.
  142. */
  143. ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
  144. if (ret == 0) {
  145. laiocb->ret = -ECANCELED;
  146. return;
  147. }
  148. /*
  149. * We have to wait for the iocb to finish.
  150. *
  151. * The only way to get the iocb status update is by polling the io context.
  152. * We might be able to do this slightly more optimal by removing the
  153. * O_NONBLOCK flag.
  154. */
  155. while (laiocb->ret == -EINPROGRESS)
  156. qemu_laio_completion_cb(laiocb->ctx);
  157. }
  158. static AIOPool laio_pool = {
  159. .aiocb_size = sizeof(struct qemu_laiocb),
  160. .cancel = laio_cancel,
  161. };
  162. BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
  163. int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
  164. BlockDriverCompletionFunc *cb, void *opaque, int type)
  165. {
  166. struct qemu_laio_state *s = aio_ctx;
  167. struct qemu_laiocb *laiocb;
  168. struct iocb *iocbs;
  169. off_t offset = sector_num * 512;
  170. laiocb = qemu_aio_get(&laio_pool, bs, cb, opaque);
  171. if (!laiocb)
  172. return NULL;
  173. laiocb->nbytes = nb_sectors * 512;
  174. laiocb->ctx = s;
  175. laiocb->ret = -EINPROGRESS;
  176. laiocb->async_context_id = get_async_context_id();
  177. iocbs = &laiocb->iocb;
  178. switch (type) {
  179. case QEMU_AIO_WRITE:
  180. io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
  181. break;
  182. case QEMU_AIO_READ:
  183. io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
  184. break;
  185. default:
  186. fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
  187. __func__, type);
  188. goto out_free_aiocb;
  189. }
  190. io_set_eventfd(&laiocb->iocb, s->efd);
  191. s->count++;
  192. if (io_submit(s->ctx, 1, &iocbs) < 0)
  193. goto out_dec_count;
  194. return &laiocb->common;
  195. out_free_aiocb:
  196. qemu_aio_release(laiocb);
  197. out_dec_count:
  198. s->count--;
  199. return NULL;
  200. }
  201. void *laio_init(void)
  202. {
  203. struct qemu_laio_state *s;
  204. s = qemu_mallocz(sizeof(*s));
  205. QLIST_INIT(&s->completed_reqs);
  206. s->efd = eventfd(0, 0);
  207. if (s->efd == -1)
  208. goto out_free_state;
  209. fcntl(s->efd, F_SETFL, O_NONBLOCK);
  210. if (io_setup(MAX_EVENTS, &s->ctx) != 0)
  211. goto out_close_efd;
  212. qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL,
  213. qemu_laio_flush_cb, qemu_laio_process_requests, s);
  214. return s;
  215. out_close_efd:
  216. close(s->efd);
  217. out_free_state:
  218. qemu_free(s);
  219. return NULL;
  220. }