iothread.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. /*
  2. * Event loop thread
  3. *
  4. * Copyright Red Hat Inc., 2013
  5. *
  6. * Authors:
  7. * Stefan Hajnoczi <stefanha@redhat.com>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. *
  12. */
  13. #include "qemu/osdep.h"
  14. #include "qom/object.h"
  15. #include "qom/object_interfaces.h"
  16. #include "qemu/module.h"
  17. #include "block/aio.h"
  18. #include "block/block.h"
  19. #include "sysemu/iothread.h"
  20. #include "qapi/error.h"
  21. #include "qapi/qapi-commands-misc.h"
  22. #include "qemu/error-report.h"
  23. #include "qemu/rcu.h"
  24. #include "qemu/main-loop.h"
  25. typedef ObjectClass IOThreadClass;
  26. DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
  27. TYPE_IOTHREAD)
  28. #ifdef CONFIG_POSIX
  29. /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
  30. * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
  31. * workloads.
  32. */
  33. #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
  34. #else
  35. #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
  36. #endif
  37. static __thread IOThread *my_iothread;
  38. AioContext *qemu_get_current_aio_context(void)
  39. {
  40. return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
  41. }
  42. static void *iothread_run(void *opaque)
  43. {
  44. IOThread *iothread = opaque;
  45. rcu_register_thread();
  46. /*
  47. * g_main_context_push_thread_default() must be called before anything
  48. * in this new thread uses glib.
  49. */
  50. g_main_context_push_thread_default(iothread->worker_context);
  51. my_iothread = iothread;
  52. iothread->thread_id = qemu_get_thread_id();
  53. qemu_sem_post(&iothread->init_done_sem);
  54. while (iothread->running) {
  55. /*
  56. * Note: from functional-wise the g_main_loop_run() below can
  57. * already cover the aio_poll() events, but we can't run the
  58. * main loop unconditionally because explicit aio_poll() here
  59. * is faster than g_main_loop_run() when we do not need the
  60. * gcontext at all (e.g., pure block layer iothreads). In
  61. * other words, when we want to run the gcontext with the
  62. * iothread we need to pay some performance for functionality.
  63. */
  64. aio_poll(iothread->ctx, true);
  65. /*
  66. * We must check the running state again in case it was
  67. * changed in previous aio_poll()
  68. */
  69. if (iothread->running && qatomic_read(&iothread->run_gcontext)) {
  70. g_main_loop_run(iothread->main_loop);
  71. }
  72. }
  73. g_main_context_pop_thread_default(iothread->worker_context);
  74. rcu_unregister_thread();
  75. return NULL;
  76. }
  77. /* Runs in iothread_run() thread */
  78. static void iothread_stop_bh(void *opaque)
  79. {
  80. IOThread *iothread = opaque;
  81. iothread->running = false; /* stop iothread_run() */
  82. if (iothread->main_loop) {
  83. g_main_loop_quit(iothread->main_loop);
  84. }
  85. }
  86. void iothread_stop(IOThread *iothread)
  87. {
  88. if (!iothread->ctx || iothread->stopping) {
  89. return;
  90. }
  91. iothread->stopping = true;
  92. aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
  93. qemu_thread_join(&iothread->thread);
  94. }
  95. static void iothread_instance_init(Object *obj)
  96. {
  97. IOThread *iothread = IOTHREAD(obj);
  98. iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
  99. iothread->thread_id = -1;
  100. qemu_sem_init(&iothread->init_done_sem, 0);
  101. /* By default, we don't run gcontext */
  102. qatomic_set(&iothread->run_gcontext, 0);
  103. }
  104. static void iothread_instance_finalize(Object *obj)
  105. {
  106. IOThread *iothread = IOTHREAD(obj);
  107. iothread_stop(iothread);
  108. /*
  109. * Before glib2 2.33.10, there is a glib2 bug that GSource context
  110. * pointer may not be cleared even if the context has already been
  111. * destroyed (while it should). Here let's free the AIO context
  112. * earlier to bypass that glib bug.
  113. *
  114. * We can remove this comment after the minimum supported glib2
  115. * version boosts to 2.33.10. Before that, let's free the
  116. * GSources first before destroying any GMainContext.
  117. */
  118. if (iothread->ctx) {
  119. aio_context_unref(iothread->ctx);
  120. iothread->ctx = NULL;
  121. }
  122. if (iothread->worker_context) {
  123. g_main_context_unref(iothread->worker_context);
  124. iothread->worker_context = NULL;
  125. g_main_loop_unref(iothread->main_loop);
  126. iothread->main_loop = NULL;
  127. }
  128. qemu_sem_destroy(&iothread->init_done_sem);
  129. }
  130. static void iothread_init_gcontext(IOThread *iothread)
  131. {
  132. GSource *source;
  133. iothread->worker_context = g_main_context_new();
  134. source = aio_get_g_source(iothread_get_aio_context(iothread));
  135. g_source_attach(source, iothread->worker_context);
  136. g_source_unref(source);
  137. iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
  138. }
  139. static void iothread_complete(UserCreatable *obj, Error **errp)
  140. {
  141. Error *local_error = NULL;
  142. IOThread *iothread = IOTHREAD(obj);
  143. char *thread_name;
  144. iothread->stopping = false;
  145. iothread->running = true;
  146. iothread->ctx = aio_context_new(errp);
  147. if (!iothread->ctx) {
  148. return;
  149. }
  150. /*
  151. * Init one GMainContext for the iothread unconditionally, even if
  152. * it's not used
  153. */
  154. iothread_init_gcontext(iothread);
  155. aio_context_set_poll_params(iothread->ctx,
  156. iothread->poll_max_ns,
  157. iothread->poll_grow,
  158. iothread->poll_shrink,
  159. &local_error);
  160. if (local_error) {
  161. error_propagate(errp, local_error);
  162. aio_context_unref(iothread->ctx);
  163. iothread->ctx = NULL;
  164. return;
  165. }
  166. /* This assumes we are called from a thread with useful CPU affinity for us
  167. * to inherit.
  168. */
  169. thread_name = g_strdup_printf("IO %s",
  170. object_get_canonical_path_component(OBJECT(obj)));
  171. qemu_thread_create(&iothread->thread, thread_name, iothread_run,
  172. iothread, QEMU_THREAD_JOINABLE);
  173. g_free(thread_name);
  174. /* Wait for initialization to complete */
  175. while (iothread->thread_id == -1) {
  176. qemu_sem_wait(&iothread->init_done_sem);
  177. }
  178. }
  179. typedef struct {
  180. const char *name;
  181. ptrdiff_t offset; /* field's byte offset in IOThread struct */
  182. } PollParamInfo;
  183. static PollParamInfo poll_max_ns_info = {
  184. "poll-max-ns", offsetof(IOThread, poll_max_ns),
  185. };
  186. static PollParamInfo poll_grow_info = {
  187. "poll-grow", offsetof(IOThread, poll_grow),
  188. };
  189. static PollParamInfo poll_shrink_info = {
  190. "poll-shrink", offsetof(IOThread, poll_shrink),
  191. };
  192. static void iothread_get_poll_param(Object *obj, Visitor *v,
  193. const char *name, void *opaque, Error **errp)
  194. {
  195. IOThread *iothread = IOTHREAD(obj);
  196. PollParamInfo *info = opaque;
  197. int64_t *field = (void *)iothread + info->offset;
  198. visit_type_int64(v, name, field, errp);
  199. }
  200. static void iothread_set_poll_param(Object *obj, Visitor *v,
  201. const char *name, void *opaque, Error **errp)
  202. {
  203. IOThread *iothread = IOTHREAD(obj);
  204. PollParamInfo *info = opaque;
  205. int64_t *field = (void *)iothread + info->offset;
  206. int64_t value;
  207. if (!visit_type_int64(v, name, &value, errp)) {
  208. return;
  209. }
  210. if (value < 0) {
  211. error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
  212. info->name, INT64_MAX);
  213. return;
  214. }
  215. *field = value;
  216. if (iothread->ctx) {
  217. aio_context_set_poll_params(iothread->ctx,
  218. iothread->poll_max_ns,
  219. iothread->poll_grow,
  220. iothread->poll_shrink,
  221. errp);
  222. }
  223. }
  224. static void iothread_class_init(ObjectClass *klass, void *class_data)
  225. {
  226. UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
  227. ucc->complete = iothread_complete;
  228. object_class_property_add(klass, "poll-max-ns", "int",
  229. iothread_get_poll_param,
  230. iothread_set_poll_param,
  231. NULL, &poll_max_ns_info);
  232. object_class_property_add(klass, "poll-grow", "int",
  233. iothread_get_poll_param,
  234. iothread_set_poll_param,
  235. NULL, &poll_grow_info);
  236. object_class_property_add(klass, "poll-shrink", "int",
  237. iothread_get_poll_param,
  238. iothread_set_poll_param,
  239. NULL, &poll_shrink_info);
  240. }
  241. static const TypeInfo iothread_info = {
  242. .name = TYPE_IOTHREAD,
  243. .parent = TYPE_OBJECT,
  244. .class_init = iothread_class_init,
  245. .instance_size = sizeof(IOThread),
  246. .instance_init = iothread_instance_init,
  247. .instance_finalize = iothread_instance_finalize,
  248. .interfaces = (InterfaceInfo[]) {
  249. {TYPE_USER_CREATABLE},
  250. {}
  251. },
  252. };
  253. static void iothread_register_types(void)
  254. {
  255. type_register_static(&iothread_info);
  256. }
  257. type_init(iothread_register_types)
  258. char *iothread_get_id(IOThread *iothread)
  259. {
  260. return g_strdup(object_get_canonical_path_component(OBJECT(iothread)));
  261. }
  262. AioContext *iothread_get_aio_context(IOThread *iothread)
  263. {
  264. return iothread->ctx;
  265. }
  266. static int query_one_iothread(Object *object, void *opaque)
  267. {
  268. IOThreadInfoList ***prev = opaque;
  269. IOThreadInfoList *elem;
  270. IOThreadInfo *info;
  271. IOThread *iothread;
  272. iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
  273. if (!iothread) {
  274. return 0;
  275. }
  276. info = g_new0(IOThreadInfo, 1);
  277. info->id = iothread_get_id(iothread);
  278. info->thread_id = iothread->thread_id;
  279. info->poll_max_ns = iothread->poll_max_ns;
  280. info->poll_grow = iothread->poll_grow;
  281. info->poll_shrink = iothread->poll_shrink;
  282. elem = g_new0(IOThreadInfoList, 1);
  283. elem->value = info;
  284. elem->next = NULL;
  285. **prev = elem;
  286. *prev = &elem->next;
  287. return 0;
  288. }
  289. IOThreadInfoList *qmp_query_iothreads(Error **errp)
  290. {
  291. IOThreadInfoList *head = NULL;
  292. IOThreadInfoList **prev = &head;
  293. Object *container = object_get_objects_root();
  294. object_child_foreach(container, query_one_iothread, &prev);
  295. return head;
  296. }
  297. GMainContext *iothread_get_g_main_context(IOThread *iothread)
  298. {
  299. qatomic_set(&iothread->run_gcontext, 1);
  300. aio_notify(iothread->ctx);
  301. return iothread->worker_context;
  302. }
  303. IOThread *iothread_create(const char *id, Error **errp)
  304. {
  305. Object *obj;
  306. obj = object_new_with_props(TYPE_IOTHREAD,
  307. object_get_internal_root(),
  308. id, errp, NULL);
  309. return IOTHREAD(obj);
  310. }
  311. void iothread_destroy(IOThread *iothread)
  312. {
  313. object_unparent(OBJECT(iothread));
  314. }
  315. /* Lookup IOThread by its id. Only finds user-created objects, not internal
  316. * iothread_create() objects. */
  317. IOThread *iothread_by_id(const char *id)
  318. {
  319. return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
  320. }