af-xdp.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. /*
  2. * AF_XDP network backend.
  3. *
  4. * Copyright (c) 2023 Red Hat, Inc.
  5. *
  6. * Authors:
  7. * Ilya Maximets <i.maximets@ovn.org>
  8. *
  9. * This work is licensed under the terms of the GNU GPL, version 2 or later.
  10. * See the COPYING file in the top-level directory.
  11. */
  12. #include "qemu/osdep.h"
  13. #include <bpf/bpf.h>
  14. #include <linux/if_link.h>
  15. #include <linux/if_xdp.h>
  16. #include <net/if.h>
  17. #include <xdp/xsk.h>
  18. #include "clients.h"
  19. #include "monitor/monitor.h"
  20. #include "net/net.h"
  21. #include "qapi/error.h"
  22. #include "qemu/cutils.h"
  23. #include "qemu/error-report.h"
  24. #include "qemu/iov.h"
  25. #include "qemu/main-loop.h"
  26. #include "qemu/memalign.h"
  27. typedef struct AFXDPState {
  28. NetClientState nc;
  29. struct xsk_socket *xsk;
  30. struct xsk_ring_cons rx;
  31. struct xsk_ring_prod tx;
  32. struct xsk_ring_cons cq;
  33. struct xsk_ring_prod fq;
  34. char ifname[IFNAMSIZ];
  35. int ifindex;
  36. bool read_poll;
  37. bool write_poll;
  38. uint32_t outstanding_tx;
  39. uint64_t *pool;
  40. uint32_t n_pool;
  41. char *buffer;
  42. struct xsk_umem *umem;
  43. uint32_t n_queues;
  44. uint32_t xdp_flags;
  45. bool inhibit;
  46. } AFXDPState;
  47. #define AF_XDP_BATCH_SIZE 64
  48. static void af_xdp_send(void *opaque);
  49. static void af_xdp_writable(void *opaque);
  50. /* Set the event-loop handlers for the af-xdp backend. */
  51. static void af_xdp_update_fd_handler(AFXDPState *s)
  52. {
  53. qemu_set_fd_handler(xsk_socket__fd(s->xsk),
  54. s->read_poll ? af_xdp_send : NULL,
  55. s->write_poll ? af_xdp_writable : NULL,
  56. s);
  57. }
  58. /* Update the read handler. */
  59. static void af_xdp_read_poll(AFXDPState *s, bool enable)
  60. {
  61. if (s->read_poll != enable) {
  62. s->read_poll = enable;
  63. af_xdp_update_fd_handler(s);
  64. }
  65. }
  66. /* Update the write handler. */
  67. static void af_xdp_write_poll(AFXDPState *s, bool enable)
  68. {
  69. if (s->write_poll != enable) {
  70. s->write_poll = enable;
  71. af_xdp_update_fd_handler(s);
  72. }
  73. }
  74. static void af_xdp_poll(NetClientState *nc, bool enable)
  75. {
  76. AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
  77. if (s->read_poll != enable || s->write_poll != enable) {
  78. s->write_poll = enable;
  79. s->read_poll = enable;
  80. af_xdp_update_fd_handler(s);
  81. }
  82. }
  83. static void af_xdp_complete_tx(AFXDPState *s)
  84. {
  85. uint32_t idx = 0;
  86. uint32_t done, i;
  87. uint64_t *addr;
  88. done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
  89. for (i = 0; i < done; i++) {
  90. addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
  91. s->pool[s->n_pool++] = *addr;
  92. s->outstanding_tx--;
  93. }
  94. if (done) {
  95. xsk_ring_cons__release(&s->cq, done);
  96. }
  97. }
  98. /*
  99. * The fd_write() callback, invoked if the fd is marked as writable
  100. * after a poll.
  101. */
  102. static void af_xdp_writable(void *opaque)
  103. {
  104. AFXDPState *s = opaque;
  105. /* Try to recover buffers that are already sent. */
  106. af_xdp_complete_tx(s);
  107. /*
  108. * Unregister the handler, unless we still have packets to transmit
  109. * and kernel needs a wake up.
  110. */
  111. if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
  112. af_xdp_write_poll(s, false);
  113. }
  114. /* Flush any buffered packets. */
  115. qemu_flush_queued_packets(&s->nc);
  116. }
  117. static ssize_t af_xdp_receive(NetClientState *nc,
  118. const uint8_t *buf, size_t size)
  119. {
  120. AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
  121. struct xdp_desc *desc;
  122. uint32_t idx;
  123. void *data;
  124. /* Try to recover buffers that are already sent. */
  125. af_xdp_complete_tx(s);
  126. if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
  127. /* We can't transmit packet this size... */
  128. return size;
  129. }
  130. if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
  131. /*
  132. * Out of buffers or space in tx ring. Poll until we can write.
  133. * This will also kick the Tx, if it was waiting on CQ.
  134. */
  135. af_xdp_write_poll(s, true);
  136. return 0;
  137. }
  138. desc = xsk_ring_prod__tx_desc(&s->tx, idx);
  139. desc->addr = s->pool[--s->n_pool];
  140. desc->len = size;
  141. data = xsk_umem__get_data(s->buffer, desc->addr);
  142. memcpy(data, buf, size);
  143. xsk_ring_prod__submit(&s->tx, 1);
  144. s->outstanding_tx++;
  145. if (xsk_ring_prod__needs_wakeup(&s->tx)) {
  146. af_xdp_write_poll(s, true);
  147. }
  148. return size;
  149. }
  150. /*
  151. * Complete a previous send (backend --> guest) and enable the
  152. * fd_read callback.
  153. */
  154. static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
  155. {
  156. AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
  157. af_xdp_read_poll(s, true);
  158. }
  159. static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
  160. {
  161. uint32_t i, idx = 0;
  162. /* Leave one packet for Tx, just in case. */
  163. if (s->n_pool < n + 1) {
  164. n = s->n_pool;
  165. }
  166. if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
  167. return;
  168. }
  169. for (i = 0; i < n; i++) {
  170. *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
  171. }
  172. xsk_ring_prod__submit(&s->fq, n);
  173. if (xsk_ring_prod__needs_wakeup(&s->fq)) {
  174. /* Receive was blocked by not having enough buffers. Wake it up. */
  175. af_xdp_read_poll(s, true);
  176. }
  177. }
  178. static void af_xdp_send(void *opaque)
  179. {
  180. uint32_t i, n_rx, idx = 0;
  181. AFXDPState *s = opaque;
  182. n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
  183. if (!n_rx) {
  184. return;
  185. }
  186. for (i = 0; i < n_rx; i++) {
  187. const struct xdp_desc *desc;
  188. struct iovec iov;
  189. desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
  190. iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
  191. iov.iov_len = desc->len;
  192. s->pool[s->n_pool++] = desc->addr;
  193. if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
  194. af_xdp_send_completed)) {
  195. /*
  196. * The peer does not receive anymore. Packet is queued, stop
  197. * reading from the backend until af_xdp_send_completed().
  198. */
  199. af_xdp_read_poll(s, false);
  200. /* Return unused descriptors to not break the ring cache. */
  201. xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
  202. n_rx = i + 1;
  203. break;
  204. }
  205. }
  206. /* Release actually sent descriptors and try to re-fill. */
  207. xsk_ring_cons__release(&s->rx, n_rx);
  208. af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
  209. }
  210. /* Flush and close. */
  211. static void af_xdp_cleanup(NetClientState *nc)
  212. {
  213. AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
  214. qemu_purge_queued_packets(nc);
  215. af_xdp_poll(nc, false);
  216. xsk_socket__delete(s->xsk);
  217. s->xsk = NULL;
  218. g_free(s->pool);
  219. s->pool = NULL;
  220. xsk_umem__delete(s->umem);
  221. s->umem = NULL;
  222. qemu_vfree(s->buffer);
  223. s->buffer = NULL;
  224. /* Remove the program if it's the last open queue. */
  225. if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
  226. && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
  227. fprintf(stderr,
  228. "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
  229. s->ifname, s->ifindex);
  230. }
  231. }
  232. static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
  233. {
  234. struct xsk_umem_config config = {
  235. .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
  236. .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
  237. .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
  238. .frame_headroom = 0,
  239. };
  240. uint64_t n_descs;
  241. uint64_t size;
  242. int64_t i;
  243. int ret;
  244. /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
  245. n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
  246. + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
  247. size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
  248. s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
  249. memset(s->buffer, 0, size);
  250. if (sock_fd < 0) {
  251. ret = xsk_umem__create(&s->umem, s->buffer, size,
  252. &s->fq, &s->cq, &config);
  253. } else {
  254. ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
  255. &s->fq, &s->cq, &config);
  256. }
  257. if (ret) {
  258. qemu_vfree(s->buffer);
  259. error_setg_errno(errp, errno,
  260. "failed to create umem for %s queue_index: %d",
  261. s->ifname, s->nc.queue_index);
  262. return -1;
  263. }
  264. s->pool = g_new(uint64_t, n_descs);
  265. /* Fill the pool in the opposite order, because it's a LIFO queue. */
  266. for (i = n_descs; i >= 0; i--) {
  267. s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
  268. }
  269. s->n_pool = n_descs;
  270. af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
  271. return 0;
  272. }
  273. static int af_xdp_socket_create(AFXDPState *s,
  274. const NetdevAFXDPOptions *opts, Error **errp)
  275. {
  276. struct xsk_socket_config cfg = {
  277. .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
  278. .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
  279. .libxdp_flags = 0,
  280. .bind_flags = XDP_USE_NEED_WAKEUP,
  281. .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
  282. };
  283. int queue_id, error = 0;
  284. s->inhibit = opts->has_inhibit && opts->inhibit;
  285. if (s->inhibit) {
  286. cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
  287. }
  288. if (opts->has_force_copy && opts->force_copy) {
  289. cfg.bind_flags |= XDP_COPY;
  290. }
  291. queue_id = s->nc.queue_index;
  292. if (opts->has_start_queue && opts->start_queue > 0) {
  293. queue_id += opts->start_queue;
  294. }
  295. if (opts->has_mode) {
  296. /* Specific mode requested. */
  297. cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
  298. ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
  299. if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
  300. s->umem, &s->rx, &s->tx, &cfg)) {
  301. error = errno;
  302. }
  303. } else {
  304. /* No mode requested, try native first. */
  305. cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
  306. if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
  307. s->umem, &s->rx, &s->tx, &cfg)) {
  308. /* Can't use native mode, try skb. */
  309. cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
  310. cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
  311. if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
  312. s->umem, &s->rx, &s->tx, &cfg)) {
  313. error = errno;
  314. }
  315. }
  316. }
  317. if (error) {
  318. error_setg_errno(errp, error,
  319. "failed to create AF_XDP socket for %s queue_id: %d",
  320. s->ifname, queue_id);
  321. return -1;
  322. }
  323. s->xdp_flags = cfg.xdp_flags;
  324. return 0;
  325. }
  326. /* NetClientInfo methods. */
  327. static NetClientInfo net_af_xdp_info = {
  328. .type = NET_CLIENT_DRIVER_AF_XDP,
  329. .size = sizeof(AFXDPState),
  330. .receive = af_xdp_receive,
  331. .poll = af_xdp_poll,
  332. .cleanup = af_xdp_cleanup,
  333. };
  334. static int *parse_socket_fds(const char *sock_fds_str,
  335. int64_t n_expected, Error **errp)
  336. {
  337. gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
  338. int64_t i, n_sock_fds = g_strv_length(substrings);
  339. int *sock_fds = NULL;
  340. if (n_sock_fds != n_expected) {
  341. error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
  342. n_expected, n_sock_fds);
  343. goto exit;
  344. }
  345. sock_fds = g_new(int, n_sock_fds);
  346. for (i = 0; i < n_sock_fds; i++) {
  347. sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
  348. if (sock_fds[i] < 0) {
  349. g_free(sock_fds);
  350. sock_fds = NULL;
  351. goto exit;
  352. }
  353. }
  354. exit:
  355. g_strfreev(substrings);
  356. return sock_fds;
  357. }
  358. /*
  359. * The exported init function.
  360. *
  361. * ... -netdev af-xdp,ifname="..."
  362. */
  363. int net_init_af_xdp(const Netdev *netdev,
  364. const char *name, NetClientState *peer, Error **errp)
  365. {
  366. const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
  367. NetClientState *nc, *nc0 = NULL;
  368. unsigned int ifindex;
  369. uint32_t prog_id = 0;
  370. g_autofree int *sock_fds = NULL;
  371. int64_t i, queues;
  372. Error *err = NULL;
  373. AFXDPState *s;
  374. ifindex = if_nametoindex(opts->ifname);
  375. if (!ifindex) {
  376. error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
  377. opts->ifname);
  378. return -1;
  379. }
  380. queues = opts->has_queues ? opts->queues : 1;
  381. if (queues < 1) {
  382. error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
  383. queues, opts->ifname);
  384. return -1;
  385. }
  386. if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
  387. error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
  388. return -1;
  389. }
  390. if (opts->sock_fds) {
  391. sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
  392. if (!sock_fds) {
  393. return -1;
  394. }
  395. }
  396. for (i = 0; i < queues; i++) {
  397. nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
  398. qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
  399. nc->queue_index = i;
  400. if (!nc0) {
  401. nc0 = nc;
  402. }
  403. s = DO_UPCAST(AFXDPState, nc, nc);
  404. pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
  405. s->ifindex = ifindex;
  406. s->n_queues = queues;
  407. if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
  408. || af_xdp_socket_create(s, opts, errp)) {
  409. /* Make sure the XDP program will be removed. */
  410. s->n_queues = i;
  411. error_propagate(errp, err);
  412. goto err;
  413. }
  414. }
  415. if (nc0) {
  416. s = DO_UPCAST(AFXDPState, nc, nc0);
  417. if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
  418. error_setg_errno(errp, errno,
  419. "no XDP program loaded on '%s', ifindex: %d",
  420. s->ifname, s->ifindex);
  421. goto err;
  422. }
  423. }
  424. af_xdp_read_poll(s, true); /* Initially only poll for reads. */
  425. return 0;
  426. err:
  427. if (nc0) {
  428. qemu_del_net_client(nc0);
  429. }
  430. return -1;
  431. }