iothread.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /*
  2. * Event loop thread
  3. *
  4. * Copyright Red Hat Inc., 2013, 2020
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. *
  12. */
  13. #include "qemu/osdep.h"
  14. #include "qom/object.h"
  15. #include "qom/object_interfaces.h"
  16. #include "qemu/module.h"
  17. #include "block/aio.h"
  18. #include "block/block.h"
  19. #include "system/event-loop-base.h"
  20. #include "system/iothread.h"
  21. #include "qapi/error.h"
  22. #include "qapi/qapi-commands-misc.h"
  23. #include "qemu/error-report.h"
  24. #include "qemu/rcu.h"
  25. #include "qemu/main-loop.h"
  26. #ifdef CONFIG_POSIX
  27. /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
  28. * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
  29. * workloads.
  30. */
  31. #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
  32. #else
  33. #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
  34. #endif
  35. static void *iothread_run(void *opaque)
  36. {
  37. IOThread *iothread = opaque;
  38. rcu_register_thread();
  39. /*
  40. * g_main_context_push_thread_default() must be called before anything
  41. * in this new thread uses glib.
  42. */
  43. g_main_context_push_thread_default(iothread->worker_context);
  44. qemu_set_current_aio_context(iothread->ctx);
  45. iothread->thread_id = qemu_get_thread_id();
  46. qemu_sem_post(&iothread->init_done_sem);
  47. while (iothread->running) {
  48. /*
  49. * Note: from functional-wise the g_main_loop_run() below can
  50. * already cover the aio_poll() events, but we can't run the
  51. * main loop unconditionally because explicit aio_poll() here
  52. * is faster than g_main_loop_run() when we do not need the
  53. * gcontext at all (e.g., pure block layer iothreads). In
  54. * other words, when we want to run the gcontext with the
  55. * iothread we need to pay some performance for functionality.
  56. */
  57. aio_poll(iothread->ctx, true);
  58. /*
  59. * We must check the running state again in case it was
  60. * changed in previous aio_poll()
  61. */
  62. if (iothread->running && qatomic_read(&iothread->run_gcontext)) {
  63. g_main_loop_run(iothread->main_loop);
  64. }
  65. }
  66. g_main_context_pop_thread_default(iothread->worker_context);
  67. rcu_unregister_thread();
  68. return NULL;
  69. }
  70. /* Runs in iothread_run() thread */
  71. static void iothread_stop_bh(void *opaque)
  72. {
  73. IOThread *iothread = opaque;
  74. iothread->running = false; /* stop iothread_run() */
  75. if (iothread->main_loop) {
  76. g_main_loop_quit(iothread->main_loop);
  77. }
  78. }
  79. void iothread_stop(IOThread *iothread)
  80. {
  81. if (!iothread->ctx || iothread->stopping) {
  82. return;
  83. }
  84. iothread->stopping = true;
  85. aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
  86. qemu_thread_join(&iothread->thread);
  87. }
  88. static void iothread_instance_init(Object *obj)
  89. {
  90. IOThread *iothread = IOTHREAD(obj);
  91. iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
  92. iothread->thread_id = -1;
  93. qemu_sem_init(&iothread->init_done_sem, 0);
  94. /* By default, we don't run gcontext */
  95. qatomic_set(&iothread->run_gcontext, 0);
  96. }
  97. static void iothread_instance_finalize(Object *obj)
  98. {
  99. IOThread *iothread = IOTHREAD(obj);
  100. iothread_stop(iothread);
  101. /*
  102. * Before glib2 2.33.10, there is a glib2 bug that GSource context
  103. * pointer may not be cleared even if the context has already been
  104. * destroyed (while it should). Here let's free the AIO context
  105. * earlier to bypass that glib bug.
  106. *
  107. * We can remove this comment after the minimum supported glib2
  108. * version boosts to 2.33.10. Before that, let's free the
  109. * GSources first before destroying any GMainContext.
  110. */
  111. if (iothread->ctx) {
  112. aio_context_unref(iothread->ctx);
  113. iothread->ctx = NULL;
  114. }
  115. if (iothread->worker_context) {
  116. g_main_context_unref(iothread->worker_context);
  117. iothread->worker_context = NULL;
  118. g_main_loop_unref(iothread->main_loop);
  119. iothread->main_loop = NULL;
  120. }
  121. qemu_sem_destroy(&iothread->init_done_sem);
  122. }
  123. static void iothread_init_gcontext(IOThread *iothread, const char *thread_name)
  124. {
  125. GSource *source;
  126. g_autofree char *name = g_strdup_printf("%s aio-context", thread_name);
  127. iothread->worker_context = g_main_context_new();
  128. source = aio_get_g_source(iothread_get_aio_context(iothread));
  129. g_source_set_name(source, name);
  130. g_source_attach(source, iothread->worker_context);
  131. g_source_unref(source);
  132. iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
  133. }
  134. static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
  135. {
  136. ERRP_GUARD();
  137. IOThread *iothread = IOTHREAD(base);
  138. if (!iothread->ctx) {
  139. return;
  140. }
  141. aio_context_set_poll_params(iothread->ctx,
  142. iothread->poll_max_ns,
  143. iothread->poll_grow,
  144. iothread->poll_shrink,
  145. errp);
  146. if (*errp) {
  147. return;
  148. }
  149. aio_context_set_aio_params(iothread->ctx,
  150. iothread->parent_obj.aio_max_batch);
  151. aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
  152. base->thread_pool_max, errp);
  153. }
  154. static void iothread_init(EventLoopBase *base, Error **errp)
  155. {
  156. Error *local_error = NULL;
  157. IOThread *iothread = IOTHREAD(base);
  158. g_autofree char *thread_name = NULL;
  159. iothread->stopping = false;
  160. iothread->running = true;
  161. iothread->ctx = aio_context_new(errp);
  162. if (!iothread->ctx) {
  163. return;
  164. }
  165. thread_name = g_strdup_printf("IO %s",
  166. object_get_canonical_path_component(OBJECT(base)));
  167. /*
  168. * Init one GMainContext for the iothread unconditionally, even if
  169. * it's not used
  170. */
  171. iothread_init_gcontext(iothread, thread_name);
  172. iothread_set_aio_context_params(base, &local_error);
  173. if (local_error) {
  174. error_propagate(errp, local_error);
  175. aio_context_unref(iothread->ctx);
  176. iothread->ctx = NULL;
  177. return;
  178. }
  179. /* This assumes we are called from a thread with useful CPU affinity for us
  180. * to inherit.
  181. */
  182. qemu_thread_create(&iothread->thread, thread_name, iothread_run,
  183. iothread, QEMU_THREAD_JOINABLE);
  184. /* Wait for initialization to complete */
  185. while (iothread->thread_id == -1) {
  186. qemu_sem_wait(&iothread->init_done_sem);
  187. }
  188. }
  189. typedef struct {
  190. const char *name;
  191. ptrdiff_t offset; /* field's byte offset in IOThread struct */
  192. } IOThreadParamInfo;
  193. static IOThreadParamInfo poll_max_ns_info = {
  194. "poll-max-ns", offsetof(IOThread, poll_max_ns),
  195. };
  196. static IOThreadParamInfo poll_grow_info = {
  197. "poll-grow", offsetof(IOThread, poll_grow),
  198. };
  199. static IOThreadParamInfo poll_shrink_info = {
  200. "poll-shrink", offsetof(IOThread, poll_shrink),
  201. };
  202. static void iothread_get_param(Object *obj, Visitor *v,
  203. const char *name, IOThreadParamInfo *info, Error **errp)
  204. {
  205. IOThread *iothread = IOTHREAD(obj);
  206. int64_t *field = (void *)iothread + info->offset;
  207. visit_type_int64(v, name, field, errp);
  208. }
  209. static bool iothread_set_param(Object *obj, Visitor *v,
  210. const char *name, IOThreadParamInfo *info, Error **errp)
  211. {
  212. IOThread *iothread = IOTHREAD(obj);
  213. int64_t *field = (void *)iothread + info->offset;
  214. int64_t value;
  215. if (!visit_type_int64(v, name, &value, errp)) {
  216. return false;
  217. }
  218. if (value < 0) {
  219. error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
  220. info->name, INT64_MAX);
  221. return false;
  222. }
  223. *field = value;
  224. return true;
  225. }
  226. static void iothread_get_poll_param(Object *obj, Visitor *v,
  227. const char *name, void *opaque, Error **errp)
  228. {
  229. IOThreadParamInfo *info = opaque;
  230. iothread_get_param(obj, v, name, info, errp);
  231. }
  232. static void iothread_set_poll_param(Object *obj, Visitor *v,
  233. const char *name, void *opaque, Error **errp)
  234. {
  235. IOThread *iothread = IOTHREAD(obj);
  236. IOThreadParamInfo *info = opaque;
  237. if (!iothread_set_param(obj, v, name, info, errp)) {
  238. return;
  239. }
  240. if (iothread->ctx) {
  241. aio_context_set_poll_params(iothread->ctx,
  242. iothread->poll_max_ns,
  243. iothread->poll_grow,
  244. iothread->poll_shrink,
  245. errp);
  246. }
  247. }
  248. static void iothread_class_init(ObjectClass *klass, void *class_data)
  249. {
  250. EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass);
  251. bc->init = iothread_init;
  252. bc->update_params = iothread_set_aio_context_params;
  253. object_class_property_add(klass, "poll-max-ns", "int",
  254. iothread_get_poll_param,
  255. iothread_set_poll_param,
  256. NULL, &poll_max_ns_info);
  257. object_class_property_add(klass, "poll-grow", "int",
  258. iothread_get_poll_param,
  259. iothread_set_poll_param,
  260. NULL, &poll_grow_info);
  261. object_class_property_add(klass, "poll-shrink", "int",
  262. iothread_get_poll_param,
  263. iothread_set_poll_param,
  264. NULL, &poll_shrink_info);
  265. }
  266. static const TypeInfo iothread_info = {
  267. .name = TYPE_IOTHREAD,
  268. .parent = TYPE_EVENT_LOOP_BASE,
  269. .class_init = iothread_class_init,
  270. .instance_size = sizeof(IOThread),
  271. .instance_init = iothread_instance_init,
  272. .instance_finalize = iothread_instance_finalize,
  273. };
  274. static void iothread_register_types(void)
  275. {
  276. type_register_static(&iothread_info);
  277. }
  278. type_init(iothread_register_types)
  279. char *iothread_get_id(IOThread *iothread)
  280. {
  281. return g_strdup(object_get_canonical_path_component(OBJECT(iothread)));
  282. }
  283. AioContext *iothread_get_aio_context(IOThread *iothread)
  284. {
  285. return iothread->ctx;
  286. }
  287. static int query_one_iothread(Object *object, void *opaque)
  288. {
  289. IOThreadInfoList ***tail = opaque;
  290. IOThreadInfo *info;
  291. IOThread *iothread;
  292. iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  293. if (!iothread) {
  294. return 0;
  295. }
  296. info = g_new0(IOThreadInfo, 1);
  297. info->id = iothread_get_id(iothread);
  298. info->thread_id = iothread->thread_id;
  299. info->poll_max_ns = iothread->poll_max_ns;
  300. info->poll_grow = iothread->poll_grow;
  301. info->poll_shrink = iothread->poll_shrink;
  302. info->aio_max_batch = iothread->parent_obj.aio_max_batch;
  303. QAPI_LIST_APPEND(*tail, info);
  304. return 0;
  305. }
  306. IOThreadInfoList *qmp_query_iothreads(Error **errp)
  307. {
  308. IOThreadInfoList *head = NULL;
  309. IOThreadInfoList **prev = &head;
  310. Object *container = object_get_objects_root();
  311. object_child_foreach(container, query_one_iothread, &prev);
  312. return head;
  313. }
  314. GMainContext *iothread_get_g_main_context(IOThread *iothread)
  315. {
  316. qatomic_set(&iothread->run_gcontext, 1);
  317. aio_notify(iothread->ctx);
  318. return iothread->worker_context;
  319. }
  320. IOThread *iothread_create(const char *id, Error **errp)
  321. {
  322. Object *obj;
  323. obj = object_new_with_props(TYPE_IOTHREAD,
  324. object_get_internal_root(),
  325. id, errp, NULL);
  326. return IOTHREAD(obj);
  327. }
  328. void iothread_destroy(IOThread *iothread)
  329. {
  330. object_unparent(OBJECT(iothread));
  331. }
  332. /* Lookup IOThread by its id. Only finds user-created objects, not internal
  333. * iothread_create() objects. */
  334. IOThread *iothread_by_id(const char *id)
  335. {
  336. return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
  337. }
  338. bool qemu_in_iothread(void)
  339. {
  340. return qemu_get_current_aio_context() != qemu_get_aio_context();
  341. }