posix-aio-compat.c 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. * QEMU posix-aio emulation
  3. *
  4. * Copyright IBM, Corp. 2008
  5. *
  6. * Authors:
  7. * Anthony Liguori <aliguori@us.ibm.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2. See
  10. * the COPYING file in the top-level directory.
  11. *
  12. */
  13. #include <pthread.h>
  14. #include <unistd.h>
  15. #include <errno.h>
  16. #include <time.h>
  17. #include <string.h>
  18. #include <stdlib.h>
  19. #include <stdio.h>
  20. #include "osdep.h"
  21. #include "posix-aio-compat.h"
  22. static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  23. static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  24. static pthread_t thread_id;
  25. static pthread_attr_t attr;
  26. static int max_threads = 64;
  27. static int cur_threads = 0;
  28. static int idle_threads = 0;
  29. static TAILQ_HEAD(, qemu_paiocb) request_list;
  30. static void die2(int err, const char *what)
  31. {
  32. fprintf(stderr, "%s failed: %s\n", what, strerror(err));
  33. abort();
  34. }
  35. static void die(const char *what)
  36. {
  37. die2(errno, what);
  38. }
  39. static void mutex_lock(pthread_mutex_t *mutex)
  40. {
  41. int ret = pthread_mutex_lock(mutex);
  42. if (ret) die2(ret, "pthread_mutex_lock");
  43. }
  44. static void mutex_unlock(pthread_mutex_t *mutex)
  45. {
  46. int ret = pthread_mutex_unlock(mutex);
  47. if (ret) die2(ret, "pthread_mutex_unlock");
  48. }
  49. static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
  50. struct timespec *ts)
  51. {
  52. int ret = pthread_cond_timedwait(cond, mutex, ts);
  53. if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
  54. return ret;
  55. }
  56. static void cond_signal(pthread_cond_t *cond)
  57. {
  58. int ret = pthread_cond_signal(cond);
  59. if (ret) die2(ret, "pthread_cond_signal");
  60. }
  61. static void thread_create(pthread_t *thread, pthread_attr_t *attr,
  62. void *(*start_routine)(void*), void *arg)
  63. {
  64. int ret = pthread_create(thread, attr, start_routine, arg);
  65. if (ret) die2(ret, "pthread_create");
  66. }
  67. static void *aio_thread(void *unused)
  68. {
  69. pid_t pid;
  70. sigset_t set;
  71. pid = getpid();
  72. /* block all signals */
  73. if (sigfillset(&set)) die("sigfillset");
  74. if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
  75. while (1) {
  76. struct qemu_paiocb *aiocb;
  77. size_t offset;
  78. int ret = 0;
  79. qemu_timeval tv;
  80. struct timespec ts;
  81. qemu_gettimeofday(&tv);
  82. ts.tv_sec = tv.tv_sec + 10;
  83. ts.tv_nsec = 0;
  84. mutex_lock(&lock);
  85. while (TAILQ_EMPTY(&request_list) &&
  86. !(ret == ETIMEDOUT)) {
  87. ret = cond_timedwait(&cond, &lock, &ts);
  88. }
  89. if (TAILQ_EMPTY(&request_list))
  90. break;
  91. aiocb = TAILQ_FIRST(&request_list);
  92. TAILQ_REMOVE(&request_list, aiocb, node);
  93. offset = 0;
  94. aiocb->active = 1;
  95. idle_threads--;
  96. mutex_unlock(&lock);
  97. while (offset < aiocb->aio_nbytes) {
  98. ssize_t len;
  99. if (aiocb->is_write)
  100. len = pwrite(aiocb->aio_fildes,
  101. (const char *)aiocb->aio_buf + offset,
  102. aiocb->aio_nbytes - offset,
  103. aiocb->aio_offset + offset);
  104. else
  105. len = pread(aiocb->aio_fildes,
  106. (char *)aiocb->aio_buf + offset,
  107. aiocb->aio_nbytes - offset,
  108. aiocb->aio_offset + offset);
  109. if (len == -1 && errno == EINTR)
  110. continue;
  111. else if (len == -1) {
  112. offset = -errno;
  113. break;
  114. } else if (len == 0)
  115. break;
  116. offset += len;
  117. }
  118. mutex_lock(&lock);
  119. aiocb->ret = offset;
  120. idle_threads++;
  121. mutex_unlock(&lock);
  122. if (kill(pid, aiocb->ev_signo)) die("kill failed");
  123. }
  124. idle_threads--;
  125. cur_threads--;
  126. mutex_unlock(&lock);
  127. return NULL;
  128. }
  129. static void spawn_thread(void)
  130. {
  131. cur_threads++;
  132. idle_threads++;
  133. thread_create(&thread_id, &attr, aio_thread, NULL);
  134. }
  135. int qemu_paio_init(struct qemu_paioinit *aioinit)
  136. {
  137. int ret;
  138. ret = pthread_attr_init(&attr);
  139. if (ret) die2(ret, "pthread_attr_init");
  140. ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  141. if (ret) die2(ret, "pthread_attr_setdetachstate");
  142. TAILQ_INIT(&request_list);
  143. return 0;
  144. }
  145. static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
  146. {
  147. aiocb->is_write = is_write;
  148. aiocb->ret = -EINPROGRESS;
  149. aiocb->active = 0;
  150. mutex_lock(&lock);
  151. if (idle_threads == 0 && cur_threads < max_threads)
  152. spawn_thread();
  153. TAILQ_INSERT_TAIL(&request_list, aiocb, node);
  154. mutex_unlock(&lock);
  155. cond_signal(&cond);
  156. return 0;
  157. }
  158. int qemu_paio_read(struct qemu_paiocb *aiocb)
  159. {
  160. return qemu_paio_submit(aiocb, 0);
  161. }
  162. int qemu_paio_write(struct qemu_paiocb *aiocb)
  163. {
  164. return qemu_paio_submit(aiocb, 1);
  165. }
  166. ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
  167. {
  168. ssize_t ret;
  169. mutex_lock(&lock);
  170. ret = aiocb->ret;
  171. mutex_unlock(&lock);
  172. return ret;
  173. }
  174. int qemu_paio_error(struct qemu_paiocb *aiocb)
  175. {
  176. ssize_t ret = qemu_paio_return(aiocb);
  177. if (ret < 0)
  178. ret = -ret;
  179. else
  180. ret = 0;
  181. return ret;
  182. }
  183. int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
  184. {
  185. int ret;
  186. mutex_lock(&lock);
  187. if (!aiocb->active) {
  188. TAILQ_REMOVE(&request_list, aiocb, node);
  189. aiocb->ret = -ECANCELED;
  190. ret = QEMU_PAIO_CANCELED;
  191. } else if (aiocb->ret == -EINPROGRESS)
  192. ret = QEMU_PAIO_NOTCANCELED;
  193. else
  194. ret = QEMU_PAIO_ALLDONE;
  195. mutex_unlock(&lock);
  196. return ret;
  197. }