2
0

task.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * QEMU I/O task
  3. *
  4. * Copyright (c) 2015 Red Hat, Inc.
  5. *
  6. * This library is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2 of the License, or (at your option) any later version.
  10. *
  11. * This library is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. #include "qemu/osdep.h"
  21. #include "io/task.h"
  22. #include "qapi/error.h"
  23. #include "qemu/thread.h"
  24. #include "trace.h"
  25. struct QIOTaskThreadData {
  26. QIOTaskWorker worker;
  27. gpointer opaque;
  28. GDestroyNotify destroy;
  29. GMainContext *context;
  30. GSource *completion;
  31. };
  32. struct QIOTask {
  33. Object *source;
  34. QIOTaskFunc func;
  35. gpointer opaque;
  36. GDestroyNotify destroy;
  37. Error *err;
  38. gpointer result;
  39. GDestroyNotify destroyResult;
  40. QemuMutex thread_lock;
  41. QemuCond thread_cond;
  42. struct QIOTaskThreadData *thread;
  43. };
  44. QIOTask *qio_task_new(Object *source,
  45. QIOTaskFunc func,
  46. gpointer opaque,
  47. GDestroyNotify destroy)
  48. {
  49. QIOTask *task;
  50. task = g_new0(QIOTask, 1);
  51. task->source = source;
  52. object_ref(source);
  53. task->func = func;
  54. task->opaque = opaque;
  55. task->destroy = destroy;
  56. qemu_mutex_init(&task->thread_lock);
  57. qemu_cond_init(&task->thread_cond);
  58. trace_qio_task_new(task, source, func, opaque);
  59. return task;
  60. }
  61. static void qio_task_free(QIOTask *task)
  62. {
  63. qemu_mutex_lock(&task->thread_lock);
  64. if (task->thread) {
  65. if (task->thread->destroy) {
  66. task->thread->destroy(task->thread->opaque);
  67. }
  68. if (task->thread->context) {
  69. g_main_context_unref(task->thread->context);
  70. }
  71. g_free(task->thread);
  72. }
  73. if (task->destroy) {
  74. task->destroy(task->opaque);
  75. }
  76. if (task->destroyResult) {
  77. task->destroyResult(task->result);
  78. }
  79. if (task->err) {
  80. error_free(task->err);
  81. }
  82. object_unref(task->source);
  83. qemu_mutex_unlock(&task->thread_lock);
  84. qemu_mutex_destroy(&task->thread_lock);
  85. qemu_cond_destroy(&task->thread_cond);
  86. g_free(task);
  87. }
  88. static gboolean qio_task_thread_result(gpointer opaque)
  89. {
  90. QIOTask *task = opaque;
  91. trace_qio_task_thread_result(task);
  92. qio_task_complete(task);
  93. return FALSE;
  94. }
  95. static gpointer qio_task_thread_worker(gpointer opaque)
  96. {
  97. QIOTask *task = opaque;
  98. trace_qio_task_thread_run(task);
  99. task->thread->worker(task, task->thread->opaque);
  100. /* We're running in the background thread, and must only
  101. * ever report the task results in the main event loop
  102. * thread. So we schedule an idle callback to report
  103. * the worker results
  104. */
  105. trace_qio_task_thread_exit(task);
  106. qemu_mutex_lock(&task->thread_lock);
  107. task->thread->completion = g_idle_source_new();
  108. g_source_set_callback(task->thread->completion,
  109. qio_task_thread_result, task, NULL);
  110. g_source_attach(task->thread->completion,
  111. task->thread->context);
  112. g_source_unref(task->thread->completion);
  113. trace_qio_task_thread_source_attach(task, task->thread->completion);
  114. qemu_cond_signal(&task->thread_cond);
  115. qemu_mutex_unlock(&task->thread_lock);
  116. return NULL;
  117. }
  118. void qio_task_run_in_thread(QIOTask *task,
  119. QIOTaskWorker worker,
  120. gpointer opaque,
  121. GDestroyNotify destroy,
  122. GMainContext *context)
  123. {
  124. struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
  125. QemuThread thread;
  126. if (context) {
  127. g_main_context_ref(context);
  128. }
  129. data->worker = worker;
  130. data->opaque = opaque;
  131. data->destroy = destroy;
  132. data->context = context;
  133. task->thread = data;
  134. trace_qio_task_thread_start(task, worker, opaque);
  135. qemu_thread_create(&thread,
  136. "io-task-worker",
  137. qio_task_thread_worker,
  138. task,
  139. QEMU_THREAD_DETACHED);
  140. }
  141. void qio_task_wait_thread(QIOTask *task)
  142. {
  143. qemu_mutex_lock(&task->thread_lock);
  144. g_assert(task->thread != NULL);
  145. while (task->thread->completion == NULL) {
  146. qemu_cond_wait(&task->thread_cond, &task->thread_lock);
  147. }
  148. trace_qio_task_thread_source_cancel(task, task->thread->completion);
  149. g_source_destroy(task->thread->completion);
  150. qemu_mutex_unlock(&task->thread_lock);
  151. qio_task_thread_result(task);
  152. }
  153. void qio_task_complete(QIOTask *task)
  154. {
  155. task->func(task, task->opaque);
  156. trace_qio_task_complete(task);
  157. qio_task_free(task);
  158. }
  159. void qio_task_set_error(QIOTask *task,
  160. Error *err)
  161. {
  162. error_propagate(&task->err, err);
  163. }
  164. bool qio_task_propagate_error(QIOTask *task,
  165. Error **errp)
  166. {
  167. if (task->err) {
  168. error_propagate(errp, task->err);
  169. task->err = NULL;
  170. return true;
  171. }
  172. return false;
  173. }
  174. void qio_task_set_result_pointer(QIOTask *task,
  175. gpointer result,
  176. GDestroyNotify destroy)
  177. {
  178. task->result = result;
  179. task->destroyResult = destroy;
  180. }
  181. gpointer qio_task_get_result_pointer(QIOTask *task)
  182. {
  183. return task->result;
  184. }
  185. Object *qio_task_get_source(QIOTask *task)
  186. {
  187. return task->source;
  188. }