123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- /*
- * AF_XDP network backend.
- *
- * Copyright (c) 2023 Red Hat, Inc.
- *
- * Authors:
- * Ilya Maximets <i.maximets@ovn.org>
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or later.
- * See the COPYING file in the top-level directory.
- */
- #include "qemu/osdep.h"
- #include <bpf/bpf.h>
- #include <linux/if_link.h>
- #include <linux/if_xdp.h>
- #include <net/if.h>
- #include <xdp/xsk.h>
- #include "clients.h"
- #include "monitor/monitor.h"
- #include "net/net.h"
- #include "qapi/error.h"
- #include "qemu/cutils.h"
- #include "qemu/error-report.h"
- #include "qemu/iov.h"
- #include "qemu/main-loop.h"
- #include "qemu/memalign.h"
- typedef struct AFXDPState {
- NetClientState nc;
- struct xsk_socket *xsk;
- struct xsk_ring_cons rx;
- struct xsk_ring_prod tx;
- struct xsk_ring_cons cq;
- struct xsk_ring_prod fq;
- char ifname[IFNAMSIZ];
- int ifindex;
- bool read_poll;
- bool write_poll;
- uint32_t outstanding_tx;
- uint64_t *pool;
- uint32_t n_pool;
- char *buffer;
- struct xsk_umem *umem;
- uint32_t n_queues;
- uint32_t xdp_flags;
- bool inhibit;
- } AFXDPState;
- #define AF_XDP_BATCH_SIZE 64
- static void af_xdp_send(void *opaque);
- static void af_xdp_writable(void *opaque);
- /* Set the event-loop handlers for the af-xdp backend. */
- static void af_xdp_update_fd_handler(AFXDPState *s)
- {
- qemu_set_fd_handler(xsk_socket__fd(s->xsk),
- s->read_poll ? af_xdp_send : NULL,
- s->write_poll ? af_xdp_writable : NULL,
- s);
- }
- /* Update the read handler. */
- static void af_xdp_read_poll(AFXDPState *s, bool enable)
- {
- if (s->read_poll != enable) {
- s->read_poll = enable;
- af_xdp_update_fd_handler(s);
- }
- }
- /* Update the write handler. */
- static void af_xdp_write_poll(AFXDPState *s, bool enable)
- {
- if (s->write_poll != enable) {
- s->write_poll = enable;
- af_xdp_update_fd_handler(s);
- }
- }
- static void af_xdp_poll(NetClientState *nc, bool enable)
- {
- AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
- if (s->read_poll != enable || s->write_poll != enable) {
- s->write_poll = enable;
- s->read_poll = enable;
- af_xdp_update_fd_handler(s);
- }
- }
- static void af_xdp_complete_tx(AFXDPState *s)
- {
- uint32_t idx = 0;
- uint32_t done, i;
- uint64_t *addr;
- done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
- for (i = 0; i < done; i++) {
- addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
- s->pool[s->n_pool++] = *addr;
- s->outstanding_tx--;
- }
- if (done) {
- xsk_ring_cons__release(&s->cq, done);
- }
- }
- /*
- * The fd_write() callback, invoked if the fd is marked as writable
- * after a poll.
- */
- static void af_xdp_writable(void *opaque)
- {
- AFXDPState *s = opaque;
- /* Try to recover buffers that are already sent. */
- af_xdp_complete_tx(s);
- /*
- * Unregister the handler, unless we still have packets to transmit
- * and kernel needs a wake up.
- */
- if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
- af_xdp_write_poll(s, false);
- }
- /* Flush any buffered packets. */
- qemu_flush_queued_packets(&s->nc);
- }
- static ssize_t af_xdp_receive(NetClientState *nc,
- const uint8_t *buf, size_t size)
- {
- AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
- struct xdp_desc *desc;
- uint32_t idx;
- void *data;
- /* Try to recover buffers that are already sent. */
- af_xdp_complete_tx(s);
- if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
- /* We can't transmit packet this size... */
- return size;
- }
- if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
- /*
- * Out of buffers or space in tx ring. Poll until we can write.
- * This will also kick the Tx, if it was waiting on CQ.
- */
- af_xdp_write_poll(s, true);
- return 0;
- }
- desc = xsk_ring_prod__tx_desc(&s->tx, idx);
- desc->addr = s->pool[--s->n_pool];
- desc->len = size;
- data = xsk_umem__get_data(s->buffer, desc->addr);
- memcpy(data, buf, size);
- xsk_ring_prod__submit(&s->tx, 1);
- s->outstanding_tx++;
- if (xsk_ring_prod__needs_wakeup(&s->tx)) {
- af_xdp_write_poll(s, true);
- }
- return size;
- }
- /*
- * Complete a previous send (backend --> guest) and enable the
- * fd_read callback.
- */
- static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
- {
- AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
- af_xdp_read_poll(s, true);
- }
- static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
- {
- uint32_t i, idx = 0;
- /* Leave one packet for Tx, just in case. */
- if (s->n_pool < n + 1) {
- n = s->n_pool;
- }
- if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
- return;
- }
- for (i = 0; i < n; i++) {
- *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
- }
- xsk_ring_prod__submit(&s->fq, n);
- if (xsk_ring_prod__needs_wakeup(&s->fq)) {
- /* Receive was blocked by not having enough buffers. Wake it up. */
- af_xdp_read_poll(s, true);
- }
- }
- static void af_xdp_send(void *opaque)
- {
- uint32_t i, n_rx, idx = 0;
- AFXDPState *s = opaque;
- n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
- if (!n_rx) {
- return;
- }
- for (i = 0; i < n_rx; i++) {
- const struct xdp_desc *desc;
- struct iovec iov;
- desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
- iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
- iov.iov_len = desc->len;
- s->pool[s->n_pool++] = desc->addr;
- if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
- af_xdp_send_completed)) {
- /*
- * The peer does not receive anymore. Packet is queued, stop
- * reading from the backend until af_xdp_send_completed().
- */
- af_xdp_read_poll(s, false);
- /* Return unused descriptors to not break the ring cache. */
- xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
- n_rx = i + 1;
- break;
- }
- }
- /* Release actually sent descriptors and try to re-fill. */
- xsk_ring_cons__release(&s->rx, n_rx);
- af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
- }
- /* Flush and close. */
- static void af_xdp_cleanup(NetClientState *nc)
- {
- AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
- qemu_purge_queued_packets(nc);
- af_xdp_poll(nc, false);
- xsk_socket__delete(s->xsk);
- s->xsk = NULL;
- g_free(s->pool);
- s->pool = NULL;
- xsk_umem__delete(s->umem);
- s->umem = NULL;
- qemu_vfree(s->buffer);
- s->buffer = NULL;
- /* Remove the program if it's the last open queue. */
- if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
- && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
- fprintf(stderr,
- "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
- s->ifname, s->ifindex);
- }
- }
- static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
- {
- struct xsk_umem_config config = {
- .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
- .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
- .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
- .frame_headroom = 0,
- };
- uint64_t n_descs;
- uint64_t size;
- int64_t i;
- int ret;
- /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
- n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
- + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
- size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
- s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
- memset(s->buffer, 0, size);
- if (sock_fd < 0) {
- ret = xsk_umem__create(&s->umem, s->buffer, size,
- &s->fq, &s->cq, &config);
- } else {
- ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
- &s->fq, &s->cq, &config);
- }
- if (ret) {
- qemu_vfree(s->buffer);
- error_setg_errno(errp, errno,
- "failed to create umem for %s queue_index: %d",
- s->ifname, s->nc.queue_index);
- return -1;
- }
- s->pool = g_new(uint64_t, n_descs);
- /* Fill the pool in the opposite order, because it's a LIFO queue. */
- for (i = n_descs; i >= 0; i--) {
- s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
- }
- s->n_pool = n_descs;
- af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
- return 0;
- }
- static int af_xdp_socket_create(AFXDPState *s,
- const NetdevAFXDPOptions *opts, Error **errp)
- {
- struct xsk_socket_config cfg = {
- .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
- .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
- .libxdp_flags = 0,
- .bind_flags = XDP_USE_NEED_WAKEUP,
- .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
- };
- int queue_id, error = 0;
- s->inhibit = opts->has_inhibit && opts->inhibit;
- if (s->inhibit) {
- cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
- }
- if (opts->has_force_copy && opts->force_copy) {
- cfg.bind_flags |= XDP_COPY;
- }
- queue_id = s->nc.queue_index;
- if (opts->has_start_queue && opts->start_queue > 0) {
- queue_id += opts->start_queue;
- }
- if (opts->has_mode) {
- /* Specific mode requested. */
- cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
- ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
- if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
- s->umem, &s->rx, &s->tx, &cfg)) {
- error = errno;
- }
- } else {
- /* No mode requested, try native first. */
- cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
- if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
- s->umem, &s->rx, &s->tx, &cfg)) {
- /* Can't use native mode, try skb. */
- cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
- cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
- if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
- s->umem, &s->rx, &s->tx, &cfg)) {
- error = errno;
- }
- }
- }
- if (error) {
- error_setg_errno(errp, error,
- "failed to create AF_XDP socket for %s queue_id: %d",
- s->ifname, queue_id);
- return -1;
- }
- s->xdp_flags = cfg.xdp_flags;
- return 0;
- }
- /* NetClientInfo methods. */
- static NetClientInfo net_af_xdp_info = {
- .type = NET_CLIENT_DRIVER_AF_XDP,
- .size = sizeof(AFXDPState),
- .receive = af_xdp_receive,
- .poll = af_xdp_poll,
- .cleanup = af_xdp_cleanup,
- };
- static int *parse_socket_fds(const char *sock_fds_str,
- int64_t n_expected, Error **errp)
- {
- gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
- int64_t i, n_sock_fds = g_strv_length(substrings);
- int *sock_fds = NULL;
- if (n_sock_fds != n_expected) {
- error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
- n_expected, n_sock_fds);
- goto exit;
- }
- sock_fds = g_new(int, n_sock_fds);
- for (i = 0; i < n_sock_fds; i++) {
- sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
- if (sock_fds[i] < 0) {
- g_free(sock_fds);
- sock_fds = NULL;
- goto exit;
- }
- }
- exit:
- g_strfreev(substrings);
- return sock_fds;
- }
- /*
- * The exported init function.
- *
- * ... -netdev af-xdp,ifname="..."
- */
- int net_init_af_xdp(const Netdev *netdev,
- const char *name, NetClientState *peer, Error **errp)
- {
- const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
- NetClientState *nc, *nc0 = NULL;
- unsigned int ifindex;
- uint32_t prog_id = 0;
- g_autofree int *sock_fds = NULL;
- int64_t i, queues;
- Error *err = NULL;
- AFXDPState *s;
- ifindex = if_nametoindex(opts->ifname);
- if (!ifindex) {
- error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
- opts->ifname);
- return -1;
- }
- queues = opts->has_queues ? opts->queues : 1;
- if (queues < 1) {
- error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
- queues, opts->ifname);
- return -1;
- }
- if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
- error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
- return -1;
- }
- if (opts->sock_fds) {
- sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
- if (!sock_fds) {
- return -1;
- }
- }
- for (i = 0; i < queues; i++) {
- nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
- qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
- nc->queue_index = i;
- if (!nc0) {
- nc0 = nc;
- }
- s = DO_UPCAST(AFXDPState, nc, nc);
- pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
- s->ifindex = ifindex;
- s->n_queues = queues;
- if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
- || af_xdp_socket_create(s, opts, errp)) {
- /* Make sure the XDP program will be removed. */
- s->n_queues = i;
- error_propagate(errp, err);
- goto err;
- }
- }
- if (nc0) {
- s = DO_UPCAST(AFXDPState, nc, nc0);
- if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
- error_setg_errno(errp, errno,
- "no XDP program loaded on '%s', ifindex: %d",
- s->ifname, s->ifindex);
- goto err;
- }
- }
- af_xdp_read_poll(s, true); /* Initially only poll for reads. */
- return 0;
- err:
- if (nc0) {
- qemu_del_net_client(nc0);
- }
- return -1;
- }
|