2
0

iothread.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /*
  2. * Event loop thread
  3. *
  4. * Copyright Red Hat Inc., 2013
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. *
  12. */
  13. #include "qemu/osdep.h"
  14. #include "qom/object.h"
  15. #include "qom/object_interfaces.h"
  16. #include "qemu/module.h"
  17. #include "block/aio.h"
  18. #include "block/block.h"
  19. #include "sysemu/iothread.h"
  20. #include "qapi/error.h"
  21. #include "qapi/qapi-commands-misc.h"
  22. #include "qemu/error-report.h"
  23. #include "qemu/rcu.h"
  24. #include "qemu/main-loop.h"
  25. typedef ObjectClass IOThreadClass;
  26. #define IOTHREAD_GET_CLASS(obj) \
  27. OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
  28. #define IOTHREAD_CLASS(klass) \
  29. OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
  30. #ifdef CONFIG_POSIX
  31. /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
  32. * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
  33. * workloads.
  34. */
  35. #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
  36. #else
  37. #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
  38. #endif
  39. static __thread IOThread *my_iothread;
  40. AioContext *qemu_get_current_aio_context(void)
  41. {
  42. return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
  43. }
  44. static void *iothread_run(void *opaque)
  45. {
  46. IOThread *iothread = opaque;
  47. rcu_register_thread();
  48. /*
  49. * g_main_context_push_thread_default() must be called before anything
  50. * in this new thread uses glib.
  51. */
  52. g_main_context_push_thread_default(iothread->worker_context);
  53. my_iothread = iothread;
  54. iothread->thread_id = qemu_get_thread_id();
  55. qemu_sem_post(&iothread->init_done_sem);
  56. while (iothread->running) {
  57. /*
  58. * Note: from functional-wise the g_main_loop_run() below can
  59. * already cover the aio_poll() events, but we can't run the
  60. * main loop unconditionally because explicit aio_poll() here
  61. * is faster than g_main_loop_run() when we do not need the
  62. * gcontext at all (e.g., pure block layer iothreads). In
  63. * other words, when we want to run the gcontext with the
  64. * iothread we need to pay some performance for functionality.
  65. */
  66. aio_poll(iothread->ctx, true);
  67. /*
  68. * We must check the running state again in case it was
  69. * changed in previous aio_poll()
  70. */
  71. if (iothread->running && atomic_read(&iothread->run_gcontext)) {
  72. g_main_loop_run(iothread->main_loop);
  73. }
  74. }
  75. g_main_context_pop_thread_default(iothread->worker_context);
  76. rcu_unregister_thread();
  77. return NULL;
  78. }
  79. /* Runs in iothread_run() thread */
  80. static void iothread_stop_bh(void *opaque)
  81. {
  82. IOThread *iothread = opaque;
  83. iothread->running = false; /* stop iothread_run() */
  84. if (iothread->main_loop) {
  85. g_main_loop_quit(iothread->main_loop);
  86. }
  87. }
  88. void iothread_stop(IOThread *iothread)
  89. {
  90. if (!iothread->ctx || iothread->stopping) {
  91. return;
  92. }
  93. iothread->stopping = true;
  94. aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
  95. qemu_thread_join(&iothread->thread);
  96. }
  97. static void iothread_instance_init(Object *obj)
  98. {
  99. IOThread *iothread = IOTHREAD(obj);
  100. iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
  101. iothread->thread_id = -1;
  102. qemu_sem_init(&iothread->init_done_sem, 0);
  103. /* By default, we don't run gcontext */
  104. atomic_set(&iothread->run_gcontext, 0);
  105. }
  106. static void iothread_instance_finalize(Object *obj)
  107. {
  108. IOThread *iothread = IOTHREAD(obj);
  109. iothread_stop(iothread);
  110. /*
  111. * Before glib2 2.33.10, there is a glib2 bug that GSource context
  112. * pointer may not be cleared even if the context has already been
  113. * destroyed (while it should). Here let's free the AIO context
  114. * earlier to bypass that glib bug.
  115. *
  116. * We can remove this comment after the minimum supported glib2
  117. * version boosts to 2.33.10. Before that, let's free the
  118. * GSources first before destroying any GMainContext.
  119. */
  120. if (iothread->ctx) {
  121. aio_context_unref(iothread->ctx);
  122. iothread->ctx = NULL;
  123. }
  124. if (iothread->worker_context) {
  125. g_main_context_unref(iothread->worker_context);
  126. iothread->worker_context = NULL;
  127. g_main_loop_unref(iothread->main_loop);
  128. iothread->main_loop = NULL;
  129. }
  130. qemu_sem_destroy(&iothread->init_done_sem);
  131. }
  132. static void iothread_init_gcontext(IOThread *iothread)
  133. {
  134. GSource *source;
  135. iothread->worker_context = g_main_context_new();
  136. source = aio_get_g_source(iothread_get_aio_context(iothread));
  137. g_source_attach(source, iothread->worker_context);
  138. g_source_unref(source);
  139. iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
  140. }
  141. static void iothread_complete(UserCreatable *obj, Error **errp)
  142. {
  143. Error *local_error = NULL;
  144. IOThread *iothread = IOTHREAD(obj);
  145. char *thread_name;
  146. iothread->stopping = false;
  147. iothread->running = true;
  148. iothread->ctx = aio_context_new(errp);
  149. if (!iothread->ctx) {
  150. return;
  151. }
  152. /*
  153. * Init one GMainContext for the iothread unconditionally, even if
  154. * it's not used
  155. */
  156. iothread_init_gcontext(iothread);
  157. aio_context_set_poll_params(iothread->ctx,
  158. iothread->poll_max_ns,
  159. iothread->poll_grow,
  160. iothread->poll_shrink,
  161. &local_error);
  162. if (local_error) {
  163. error_propagate(errp, local_error);
  164. aio_context_unref(iothread->ctx);
  165. iothread->ctx = NULL;
  166. return;
  167. }
  168. /* This assumes we are called from a thread with useful CPU affinity for us
  169. * to inherit.
  170. */
  171. thread_name = g_strdup_printf("IO %s",
  172. object_get_canonical_path_component(OBJECT(obj)));
  173. qemu_thread_create(&iothread->thread, thread_name, iothread_run,
  174. iothread, QEMU_THREAD_JOINABLE);
  175. g_free(thread_name);
  176. /* Wait for initialization to complete */
  177. while (iothread->thread_id == -1) {
  178. qemu_sem_wait(&iothread->init_done_sem);
  179. }
  180. }
  181. typedef struct {
  182. const char *name;
  183. ptrdiff_t offset; /* field's byte offset in IOThread struct */
  184. } PollParamInfo;
  185. static PollParamInfo poll_max_ns_info = {
  186. "poll-max-ns", offsetof(IOThread, poll_max_ns),
  187. };
  188. static PollParamInfo poll_grow_info = {
  189. "poll-grow", offsetof(IOThread, poll_grow),
  190. };
  191. static PollParamInfo poll_shrink_info = {
  192. "poll-shrink", offsetof(IOThread, poll_shrink),
  193. };
  194. static void iothread_get_poll_param(Object *obj, Visitor *v,
  195. const char *name, void *opaque, Error **errp)
  196. {
  197. IOThread *iothread = IOTHREAD(obj);
  198. PollParamInfo *info = opaque;
  199. int64_t *field = (void *)iothread + info->offset;
  200. visit_type_int64(v, name, field, errp);
  201. }
  202. static void iothread_set_poll_param(Object *obj, Visitor *v,
  203. const char *name, void *opaque, Error **errp)
  204. {
  205. IOThread *iothread = IOTHREAD(obj);
  206. PollParamInfo *info = opaque;
  207. int64_t *field = (void *)iothread + info->offset;
  208. int64_t value;
  209. if (!visit_type_int64(v, name, &value, errp)) {
  210. return;
  211. }
  212. if (value < 0) {
  213. error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
  214. info->name, INT64_MAX);
  215. return;
  216. }
  217. *field = value;
  218. if (iothread->ctx) {
  219. aio_context_set_poll_params(iothread->ctx,
  220. iothread->poll_max_ns,
  221. iothread->poll_grow,
  222. iothread->poll_shrink,
  223. errp);
  224. }
  225. }
  226. static void iothread_class_init(ObjectClass *klass, void *class_data)
  227. {
  228. UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
  229. ucc->complete = iothread_complete;
  230. object_class_property_add(klass, "poll-max-ns", "int",
  231. iothread_get_poll_param,
  232. iothread_set_poll_param,
  233. NULL, &poll_max_ns_info);
  234. object_class_property_add(klass, "poll-grow", "int",
  235. iothread_get_poll_param,
  236. iothread_set_poll_param,
  237. NULL, &poll_grow_info);
  238. object_class_property_add(klass, "poll-shrink", "int",
  239. iothread_get_poll_param,
  240. iothread_set_poll_param,
  241. NULL, &poll_shrink_info);
  242. }
  243. static const TypeInfo iothread_info = {
  244. .name = TYPE_IOTHREAD,
  245. .parent = TYPE_OBJECT,
  246. .class_init = iothread_class_init,
  247. .instance_size = sizeof(IOThread),
  248. .instance_init = iothread_instance_init,
  249. .instance_finalize = iothread_instance_finalize,
  250. .interfaces = (InterfaceInfo[]) {
  251. {TYPE_USER_CREATABLE},
  252. {}
  253. },
  254. };
  255. static void iothread_register_types(void)
  256. {
  257. type_register_static(&iothread_info);
  258. }
  259. type_init(iothread_register_types)
  260. char *iothread_get_id(IOThread *iothread)
  261. {
  262. return g_strdup(object_get_canonical_path_component(OBJECT(iothread)));
  263. }
  264. AioContext *iothread_get_aio_context(IOThread *iothread)
  265. {
  266. return iothread->ctx;
  267. }
  268. static int query_one_iothread(Object *object, void *opaque)
  269. {
  270. IOThreadInfoList ***prev = opaque;
  271. IOThreadInfoList *elem;
  272. IOThreadInfo *info;
  273. IOThread *iothread;
  274. iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  275. if (!iothread) {
  276. return 0;
  277. }
  278. info = g_new0(IOThreadInfo, 1);
  279. info->id = iothread_get_id(iothread);
  280. info->thread_id = iothread->thread_id;
  281. info->poll_max_ns = iothread->poll_max_ns;
  282. info->poll_grow = iothread->poll_grow;
  283. info->poll_shrink = iothread->poll_shrink;
  284. elem = g_new0(IOThreadInfoList, 1);
  285. elem->value = info;
  286. elem->next = NULL;
  287. **prev = elem;
  288. *prev = &elem->next;
  289. return 0;
  290. }
  291. IOThreadInfoList *qmp_query_iothreads(Error **errp)
  292. {
  293. IOThreadInfoList *head = NULL;
  294. IOThreadInfoList **prev = &head;
  295. Object *container = object_get_objects_root();
  296. object_child_foreach(container, query_one_iothread, &prev);
  297. return head;
  298. }
  299. GMainContext *iothread_get_g_main_context(IOThread *iothread)
  300. {
  301. atomic_set(&iothread->run_gcontext, 1);
  302. aio_notify(iothread->ctx);
  303. return iothread->worker_context;
  304. }
  305. IOThread *iothread_create(const char *id, Error **errp)
  306. {
  307. Object *obj;
  308. obj = object_new_with_props(TYPE_IOTHREAD,
  309. object_get_internal_root(),
  310. id, errp, NULL);
  311. return IOTHREAD(obj);
  312. }
  313. void iothread_destroy(IOThread *iothread)
  314. {
  315. object_unparent(OBJECT(iothread));
  316. }
  317. /* Lookup IOThread by its id. Only finds user-created objects, not internal
  318. * iothread_create() objects. */
  319. IOThread *iothread_by_id(const char *id)
  320. {
  321. return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
  322. }