2
0

vhost-user-server.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. /*
  2. * Sharing QEMU devices via vhost-user protocol
  3. *
  4. * Copyright (c) Coiby Xu <coiby.xu@gmail.com>.
  5. * Copyright (c) 2020 Red Hat, Inc.
  6. *
  7. * This work is licensed under the terms of the GNU GPL, version 2 or
  8. * later. See the COPYING file in the top-level directory.
  9. */
  10. #include "qemu/osdep.h"
  11. #include "qemu/error-report.h"
  12. #include "qemu/main-loop.h"
  13. #include "qemu/vhost-user-server.h"
  14. #include "block/aio-wait.h"
  15. /*
  16. * Theory of operation:
  17. *
  18. * VuServer is started and stopped by vhost_user_server_start() and
  19. * vhost_user_server_stop() from the main loop thread. Starting the server
  20. * opens a vhost-user UNIX domain socket and listens for incoming connections.
  21. * Only one connection is allowed at a time.
  22. *
  23. * The connection is handled by the vu_client_trip() coroutine in the
  24. * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
  25. * where libvhost-user calls vu_message_read() to receive the next vhost-user
  26. * protocol messages over the UNIX domain socket.
  27. *
  28. * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
  29. * fds. These fds are also handled in the VuServer->ctx AioContext.
  30. *
  31. * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
  32. * the socket connection. Shutting down the socket connection causes
  33. * vu_message_read() to fail since no more data can be received from the socket.
  34. * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
  35. * libvhost-user before terminating the coroutine. vu_deinit() calls
  36. * remove_watch() to stop monitoring kick fds and this stops virtqueue
  37. * processing.
  38. *
  39. * When vu_client_trip() has finished cleaning up it schedules a BH in the main
  40. * loop thread to accept the next client connection.
  41. *
  42. * When libvhost-user detects an error it calls panic_cb() and sets the
  43. * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
  44. * the dev->broken flag is set.
  45. *
  46. * It is possible to switch AioContexts using
  47. * vhost_user_server_detach_aio_context() and
  48. * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
  49. * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
  50. * coroutine remains in a yielded state during the switch. This is made
  51. * possible by QIOChannel's support for spurious coroutine re-entry in
  52. * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
  53. * new AioContext.
  54. */
  55. static void vmsg_close_fds(VhostUserMsg *vmsg)
  56. {
  57. int i;
  58. for (i = 0; i < vmsg->fd_num; i++) {
  59. close(vmsg->fds[i]);
  60. }
  61. }
  62. static void vmsg_unblock_fds(VhostUserMsg *vmsg)
  63. {
  64. int i;
  65. for (i = 0; i < vmsg->fd_num; i++) {
  66. qemu_socket_set_nonblock(vmsg->fds[i]);
  67. }
  68. }
  69. static void panic_cb(VuDev *vu_dev, const char *buf)
  70. {
  71. error_report("vu_panic: %s", buf);
  72. }
  73. void vhost_user_server_inc_in_flight(VuServer *server)
  74. {
  75. assert(!server->wait_idle);
  76. qatomic_inc(&server->in_flight);
  77. }
  78. void vhost_user_server_dec_in_flight(VuServer *server)
  79. {
  80. if (qatomic_fetch_dec(&server->in_flight) == 1) {
  81. if (server->wait_idle) {
  82. aio_co_wake(server->co_trip);
  83. }
  84. }
  85. }
  86. bool vhost_user_server_has_in_flight(VuServer *server)
  87. {
  88. return qatomic_load_acquire(&server->in_flight) > 0;
  89. }
  90. static bool coroutine_fn
  91. vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
  92. {
  93. struct iovec iov = {
  94. .iov_base = (char *)vmsg,
  95. .iov_len = VHOST_USER_HDR_SIZE,
  96. };
  97. int rc, read_bytes = 0;
  98. Error *local_err = NULL;
  99. const size_t max_fds = G_N_ELEMENTS(vmsg->fds);
  100. VuServer *server = container_of(vu_dev, VuServer, vu_dev);
  101. QIOChannel *ioc = server->ioc;
  102. vmsg->fd_num = 0;
  103. if (!ioc) {
  104. error_report_err(local_err);
  105. goto fail;
  106. }
  107. assert(qemu_in_coroutine());
  108. do {
  109. size_t nfds = 0;
  110. int *fds = NULL;
  111. /*
  112. * qio_channel_readv_full may have short reads, keeping calling it
  113. * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
  114. */
  115. rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err);
  116. if (rc < 0) {
  117. if (rc == QIO_CHANNEL_ERR_BLOCK) {
  118. assert(local_err == NULL);
  119. if (server->ctx) {
  120. server->in_qio_channel_yield = true;
  121. qio_channel_yield(ioc, G_IO_IN);
  122. server->in_qio_channel_yield = false;
  123. } else {
  124. return false;
  125. }
  126. continue;
  127. } else {
  128. error_report_err(local_err);
  129. goto fail;
  130. }
  131. }
  132. if (nfds > 0) {
  133. if (vmsg->fd_num + nfds > max_fds) {
  134. error_report("A maximum of %zu fds are allowed, "
  135. "however got %zu fds now",
  136. max_fds, vmsg->fd_num + nfds);
  137. g_free(fds);
  138. goto fail;
  139. }
  140. memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0]));
  141. vmsg->fd_num += nfds;
  142. g_free(fds);
  143. }
  144. if (rc == 0) { /* socket closed */
  145. goto fail;
  146. }
  147. iov.iov_base += rc;
  148. iov.iov_len -= rc;
  149. read_bytes += rc;
  150. } while (read_bytes != VHOST_USER_HDR_SIZE);
  151. /* qio_channel_readv_full will make socket fds blocking, unblock them */
  152. vmsg_unblock_fds(vmsg);
  153. if (vmsg->size > sizeof(vmsg->payload)) {
  154. error_report("Error: too big message request: %d, "
  155. "size: vmsg->size: %u, "
  156. "while sizeof(vmsg->payload) = %zu",
  157. vmsg->request, vmsg->size, sizeof(vmsg->payload));
  158. goto fail;
  159. }
  160. struct iovec iov_payload = {
  161. .iov_base = (char *)&vmsg->payload,
  162. .iov_len = vmsg->size,
  163. };
  164. if (vmsg->size) {
  165. rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
  166. if (rc != 1) {
  167. if (local_err) {
  168. error_report_err(local_err);
  169. }
  170. goto fail;
  171. }
  172. }
  173. return true;
  174. fail:
  175. vmsg_close_fds(vmsg);
  176. return false;
  177. }
  178. static coroutine_fn void vu_client_trip(void *opaque)
  179. {
  180. VuServer *server = opaque;
  181. VuDev *vu_dev = &server->vu_dev;
  182. while (!vu_dev->broken) {
  183. if (server->quiescing) {
  184. server->co_trip = NULL;
  185. aio_wait_kick();
  186. return;
  187. }
  188. /* vu_dispatch() returns false if server->ctx went away */
  189. if (!vu_dispatch(vu_dev) && server->ctx) {
  190. break;
  191. }
  192. }
  193. if (vhost_user_server_has_in_flight(server)) {
  194. /* Wait for requests to complete before we can unmap the memory */
  195. server->wait_idle = true;
  196. qemu_coroutine_yield();
  197. server->wait_idle = false;
  198. }
  199. assert(!vhost_user_server_has_in_flight(server));
  200. vu_deinit(vu_dev);
  201. /* vu_deinit() should have called remove_watch() */
  202. assert(QTAILQ_EMPTY(&server->vu_fd_watches));
  203. object_unref(OBJECT(server->sioc));
  204. server->sioc = NULL;
  205. object_unref(OBJECT(server->ioc));
  206. server->ioc = NULL;
  207. server->co_trip = NULL;
  208. if (server->restart_listener_bh) {
  209. qemu_bh_schedule(server->restart_listener_bh);
  210. }
  211. aio_wait_kick();
  212. }
  213. /*
  214. * a wrapper for vu_kick_cb
  215. *
  216. * since aio_dispatch can only pass one user data pointer to the
  217. * callback function, pack VuDev and pvt into a struct. Then unpack it
  218. * and pass them to vu_kick_cb
  219. */
  220. static void kick_handler(void *opaque)
  221. {
  222. VuFdWatch *vu_fd_watch = opaque;
  223. VuDev *vu_dev = vu_fd_watch->vu_dev;
  224. vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
  225. /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
  226. if (vu_dev->broken) {
  227. VuServer *server = container_of(vu_dev, VuServer, vu_dev);
  228. qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
  229. }
  230. }
  231. static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
  232. {
  233. VuFdWatch *vu_fd_watch, *next;
  234. QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
  235. if (vu_fd_watch->fd == fd) {
  236. return vu_fd_watch;
  237. }
  238. }
  239. return NULL;
  240. }
  241. static void
  242. set_watch(VuDev *vu_dev, int fd, int vu_evt,
  243. vu_watch_cb cb, void *pvt)
  244. {
  245. VuServer *server = container_of(vu_dev, VuServer, vu_dev);
  246. g_assert(vu_dev);
  247. g_assert(fd >= 0);
  248. g_assert(cb);
  249. VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
  250. if (!vu_fd_watch) {
  251. vu_fd_watch = g_new0(VuFdWatch, 1);
  252. QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next);
  253. vu_fd_watch->fd = fd;
  254. vu_fd_watch->cb = cb;
  255. qemu_socket_set_nonblock(fd);
  256. aio_set_fd_handler(server->ctx, fd, kick_handler,
  257. NULL, NULL, NULL, vu_fd_watch);
  258. vu_fd_watch->vu_dev = vu_dev;
  259. vu_fd_watch->pvt = pvt;
  260. }
  261. }
  262. static void remove_watch(VuDev *vu_dev, int fd)
  263. {
  264. VuServer *server;
  265. g_assert(vu_dev);
  266. g_assert(fd >= 0);
  267. server = container_of(vu_dev, VuServer, vu_dev);
  268. VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
  269. if (!vu_fd_watch) {
  270. return;
  271. }
  272. aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
  273. QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
  274. g_free(vu_fd_watch);
  275. }
  276. static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
  277. gpointer opaque)
  278. {
  279. VuServer *server = opaque;
  280. if (server->sioc) {
  281. warn_report("Only one vhost-user client is allowed to "
  282. "connect the server one time");
  283. return;
  284. }
  285. if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
  286. vu_message_read, set_watch, remove_watch, server->vu_iface)) {
  287. error_report("Failed to initialize libvhost-user");
  288. return;
  289. }
  290. /*
  291. * Unset the callback function for network listener to make another
  292. * vhost-user client keeping waiting until this client disconnects
  293. */
  294. qio_net_listener_set_client_func(server->listener,
  295. NULL,
  296. NULL,
  297. NULL);
  298. server->sioc = sioc;
  299. /*
  300. * Increase the object reference, so sioc will not freed by
  301. * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
  302. */
  303. object_ref(OBJECT(server->sioc));
  304. qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
  305. server->ioc = QIO_CHANNEL(sioc);
  306. object_ref(OBJECT(server->ioc));
  307. /* TODO vu_message_write() spins if non-blocking! */
  308. qio_channel_set_blocking(server->ioc, false, NULL);
  309. qio_channel_set_follow_coroutine_ctx(server->ioc, true);
  310. vhost_user_server_attach_aio_context(server, server->ctx);
  311. }
  312. /* server->ctx acquired by caller */
  313. void vhost_user_server_stop(VuServer *server)
  314. {
  315. qemu_bh_delete(server->restart_listener_bh);
  316. server->restart_listener_bh = NULL;
  317. if (server->sioc) {
  318. VuFdWatch *vu_fd_watch;
  319. QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
  320. aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
  321. NULL, NULL, NULL, NULL, vu_fd_watch);
  322. }
  323. qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
  324. AIO_WAIT_WHILE(server->ctx, server->co_trip);
  325. }
  326. if (server->listener) {
  327. qio_net_listener_disconnect(server->listener);
  328. object_unref(OBJECT(server->listener));
  329. }
  330. }
  331. /*
  332. * Allow the next client to connect to the server. Called from a BH in the main
  333. * loop.
  334. */
  335. static void restart_listener_bh(void *opaque)
  336. {
  337. VuServer *server = opaque;
  338. qio_net_listener_set_client_func(server->listener, vu_accept, server,
  339. NULL);
  340. }
  341. /* Called with ctx acquired */
  342. void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
  343. {
  344. VuFdWatch *vu_fd_watch;
  345. server->ctx = ctx;
  346. if (!server->sioc) {
  347. return;
  348. }
  349. QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
  350. aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
  351. NULL, NULL, vu_fd_watch);
  352. }
  353. if (server->co_trip) {
  354. /*
  355. * The caller didn't fully shut down co_trip (this can happen on
  356. * non-polling drains like in bdrv_graph_wrlock()). This is okay as long
  357. * as it no longer tries to shut it down and we're guaranteed to still
  358. * be in the same AioContext as before.
  359. *
  360. * co_ctx can still be NULL if we get multiple calls and only just
  361. * scheduled a new coroutine in the else branch.
  362. */
  363. AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip);
  364. assert(!server->quiescing);
  365. assert(!co_ctx || co_ctx == ctx);
  366. } else {
  367. server->co_trip = qemu_coroutine_create(vu_client_trip, server);
  368. assert(!server->in_qio_channel_yield);
  369. aio_co_schedule(ctx, server->co_trip);
  370. }
  371. }
  372. /* Called with server->ctx acquired */
  373. void vhost_user_server_detach_aio_context(VuServer *server)
  374. {
  375. if (server->sioc) {
  376. VuFdWatch *vu_fd_watch;
  377. QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
  378. aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
  379. NULL, NULL, NULL, NULL, vu_fd_watch);
  380. }
  381. }
  382. server->ctx = NULL;
  383. if (server->ioc) {
  384. if (server->in_qio_channel_yield) {
  385. /* Stop receiving the next vhost-user message */
  386. qio_channel_wake_read(server->ioc);
  387. }
  388. }
  389. }
  390. bool vhost_user_server_start(VuServer *server,
  391. SocketAddress *socket_addr,
  392. AioContext *ctx,
  393. uint16_t max_queues,
  394. const VuDevIface *vu_iface,
  395. Error **errp)
  396. {
  397. QEMUBH *bh;
  398. QIONetListener *listener;
  399. if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX &&
  400. socket_addr->type != SOCKET_ADDRESS_TYPE_FD) {
  401. error_setg(errp, "Only socket address types 'unix' and 'fd' are supported");
  402. return false;
  403. }
  404. listener = qio_net_listener_new();
  405. if (qio_net_listener_open_sync(listener, socket_addr, 1,
  406. errp) < 0) {
  407. object_unref(OBJECT(listener));
  408. return false;
  409. }
  410. bh = qemu_bh_new(restart_listener_bh, server);
  411. /* zero out unspecified fields */
  412. *server = (VuServer) {
  413. .listener = listener,
  414. .restart_listener_bh = bh,
  415. .vu_iface = vu_iface,
  416. .max_queues = max_queues,
  417. .ctx = ctx,
  418. };
  419. qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
  420. qio_net_listener_set_client_func(server->listener,
  421. vu_accept,
  422. server,
  423. NULL);
  424. QTAILQ_INIT(&server->vu_fd_watches);
  425. return true;
  426. }