2
0

task.c 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * QEMU I/O task
  3. *
  4. * Copyright (c) 2015 Red Hat, Inc.
  5. *
  6. * This library is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2 of the License, or (at your option) any later version.
  10. *
  11. * This library is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. #include "qemu/osdep.h"
  21. #include "io/task.h"
  22. #include "qapi/error.h"
  23. #include "qemu/thread.h"
  24. #include "qom/object.h"
  25. #include "trace.h"
  26. struct QIOTaskThreadData {
  27. QIOTaskWorker worker;
  28. gpointer opaque;
  29. GDestroyNotify destroy;
  30. GMainContext *context;
  31. GSource *completion;
  32. };
  33. struct QIOTask {
  34. Object *source;
  35. QIOTaskFunc func;
  36. gpointer opaque;
  37. GDestroyNotify destroy;
  38. Error *err;
  39. gpointer result;
  40. GDestroyNotify destroyResult;
  41. QemuMutex thread_lock;
  42. QemuCond thread_cond;
  43. struct QIOTaskThreadData *thread;
  44. };
  45. QIOTask *qio_task_new(Object *source,
  46. QIOTaskFunc func,
  47. gpointer opaque,
  48. GDestroyNotify destroy)
  49. {
  50. QIOTask *task;
  51. task = g_new0(QIOTask, 1);
  52. task->source = source;
  53. object_ref(source);
  54. task->func = func;
  55. task->opaque = opaque;
  56. task->destroy = destroy;
  57. qemu_mutex_init(&task->thread_lock);
  58. qemu_cond_init(&task->thread_cond);
  59. trace_qio_task_new(task, source, func, opaque);
  60. return task;
  61. }
  62. static void qio_task_free(QIOTask *task)
  63. {
  64. qemu_mutex_lock(&task->thread_lock);
  65. if (task->thread) {
  66. if (task->thread->destroy) {
  67. task->thread->destroy(task->thread->opaque);
  68. }
  69. if (task->thread->context) {
  70. g_main_context_unref(task->thread->context);
  71. }
  72. g_free(task->thread);
  73. }
  74. if (task->destroy) {
  75. task->destroy(task->opaque);
  76. }
  77. if (task->destroyResult) {
  78. task->destroyResult(task->result);
  79. }
  80. if (task->err) {
  81. error_free(task->err);
  82. }
  83. object_unref(task->source);
  84. qemu_mutex_unlock(&task->thread_lock);
  85. qemu_mutex_destroy(&task->thread_lock);
  86. qemu_cond_destroy(&task->thread_cond);
  87. g_free(task);
  88. }
  89. static gboolean qio_task_thread_result(gpointer opaque)
  90. {
  91. QIOTask *task = opaque;
  92. trace_qio_task_thread_result(task);
  93. qio_task_complete(task);
  94. return FALSE;
  95. }
  96. static gpointer qio_task_thread_worker(gpointer opaque)
  97. {
  98. QIOTask *task = opaque;
  99. trace_qio_task_thread_run(task);
  100. task->thread->worker(task, task->thread->opaque);
  101. /* We're running in the background thread, and must only
  102. * ever report the task results in the main event loop
  103. * thread. So we schedule an idle callback to report
  104. * the worker results
  105. */
  106. trace_qio_task_thread_exit(task);
  107. qemu_mutex_lock(&task->thread_lock);
  108. task->thread->completion = g_idle_source_new();
  109. g_source_set_callback(task->thread->completion,
  110. qio_task_thread_result, task, NULL);
  111. g_source_attach(task->thread->completion,
  112. task->thread->context);
  113. g_source_unref(task->thread->completion);
  114. trace_qio_task_thread_source_attach(task, task->thread->completion);
  115. qemu_cond_signal(&task->thread_cond);
  116. qemu_mutex_unlock(&task->thread_lock);
  117. return NULL;
  118. }
  119. void qio_task_run_in_thread(QIOTask *task,
  120. QIOTaskWorker worker,
  121. gpointer opaque,
  122. GDestroyNotify destroy,
  123. GMainContext *context)
  124. {
  125. struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
  126. QemuThread thread;
  127. if (context) {
  128. g_main_context_ref(context);
  129. }
  130. data->worker = worker;
  131. data->opaque = opaque;
  132. data->destroy = destroy;
  133. data->context = context;
  134. task->thread = data;
  135. trace_qio_task_thread_start(task, worker, opaque);
  136. qemu_thread_create(&thread,
  137. "io-task-worker",
  138. qio_task_thread_worker,
  139. task,
  140. QEMU_THREAD_DETACHED);
  141. }
  142. void qio_task_wait_thread(QIOTask *task)
  143. {
  144. qemu_mutex_lock(&task->thread_lock);
  145. g_assert(task->thread != NULL);
  146. while (task->thread->completion == NULL) {
  147. qemu_cond_wait(&task->thread_cond, &task->thread_lock);
  148. }
  149. trace_qio_task_thread_source_cancel(task, task->thread->completion);
  150. g_source_destroy(task->thread->completion);
  151. qemu_mutex_unlock(&task->thread_lock);
  152. qio_task_thread_result(task);
  153. }
  154. void qio_task_complete(QIOTask *task)
  155. {
  156. task->func(task, task->opaque);
  157. trace_qio_task_complete(task);
  158. qio_task_free(task);
  159. }
  160. void qio_task_set_error(QIOTask *task,
  161. Error *err)
  162. {
  163. error_propagate(&task->err, err);
  164. }
  165. bool qio_task_propagate_error(QIOTask *task,
  166. Error **errp)
  167. {
  168. if (task->err) {
  169. error_propagate(errp, task->err);
  170. task->err = NULL;
  171. return true;
  172. }
  173. return false;
  174. }
  175. void qio_task_set_result_pointer(QIOTask *task,
  176. gpointer result,
  177. GDestroyNotify destroy)
  178. {
  179. task->result = result;
  180. task->destroyResult = destroy;
  181. }
  182. gpointer qio_task_get_result_pointer(QIOTask *task)
  183. {
  184. return task->result;
  185. }
  186. Object *qio_task_get_source(QIOTask *task)
  187. {
  188. return task->source;
  189. }