On Mon, Oct 26, 2015 at 07:19:23PM +0200, Victor Kaplansky wrote: > The test existing in QEMU for vhost-user feature is good for > testing the management protocol, but does not allow actual > traffic. This patch proposes Vhost-User Bridge application, which > can serve the QEMU community as a comprehensive test by running > real internet traffic by means of vhost-user interface. > > Essentially the Vhost-User Bridge is a very basic vhost-user > backend for QEMU. It runs as a standalone user-level process. > For packet processing Vhost-User Bridge uses an additional QEMU > instance with a backend configured by "-net socket" as a shared > VLAN. This way another QEMU virtual machine can effectively > serve as a shared bus by means of UDP communication. > > For a more simple setup, the another QEMU instance running the > SLiRP backend can be the same QEMU instance running vhost-user > client. > > This Vhost-User Bridge implementation is very preliminary. It is > missing many features. I has been studying vhost-user protocol > internals, so I've written vhost-user-bridge bit by bit as I > progressed through the protocol. Most probably its internal > architecture will change significantly. > > To run Vhost-User Bridge application: > > 1. Build vhost-user-bridge with a regular procedure. This will > create a vhost-user-bridge executable under tests directory: > > $ configure; make tests/vhost-user-bridge > > 2. Ensure the machine has hugepages enabled in kernel with > command line like: > > default_hugepagesz=2M hugepagesz=2M hugepages=2048 > > 3. Run Vhost-User Bridge with: > > $ tests/vhost-user-bridge > > The above will run vhost-user server listening for connections > on UNIX domain socket /tmp/vubr.sock, and will try to connect > by UDP to VLAN bridge to localhost:5555, while listening on > localhost:4444 > > Run qemu with a virtio-net backed by vhost-user: > > $ qemu \ > -enable-kvm -m 512 -smp 2 \ > -object > memory-backend-file,id=mem,size=512M,mem-path=/dev/hugepages,share=on \ > -numa node,memdev=mem -mem-prealloc \ > -chardev socket,id=char0,path=/tmp/vubr.sock \ > -netdev type=vhost-user,id=mynet1,chardev=char0,vhostforce \ > -device virtio-net-pci,netdev=mynet1 \ > -net none \ > -net socket,vlan=0,udp=localhost:4444,localaddr=localhost:5555 \ > -net user,vlan=0 \ > disk.img > > vhost-user-bridge was tested very lightly: it's able to bring up a > Linux on client VM with the virtio-net driver, and execute transmits > and receives to the internet. I tested with "wget redhat.com", > "dig redhat.com". > > PS. I've consulted DPDK's code for vhost-user during Vhost-User > Bridge implementation. > > Signed-off-by: Victor Kaplansky <vict...@redhat.com> > --- > v2: > Cosmetic changes: > - Tabs expanded, trailing spaces removed. > - Removed use of architecture specific definitions starting with _ > - Used header files available in qemu/includes. > - Rearranged source into single file. > - checkpatch.pl pacified. > - Added copyright note. > - Small spelling corrections. > - Removed _ prefixes in function names. > - Makefile incorporated into tests/Makefile. > - Error handling code changed to use die(). > - Prefix "vubr" replaced by "vhost_user". > - Structures, enums and function type names renamed to > comply with CODING_STYLE doc. > - Preprocessor tricks thrown away. > - Lines are no longer than 80. > > Functional changes: > - Added memory barriers. > - Implemented SET_OWNER. (by doing nothing). > > TODO: > - move all declarations before code inside block. > - main should get parameters from the command line. Not required for merge. > - cleanup debug printings. > - implement all request handlers. > - test for broken requests and virtqueue. Not required for merge. > - implement features defined by Virtio 1.0 spec. Not required for merge. > - support mergeable buffers and indirect descriptors.
Not required for merge. > - implement RESET_DEVICE request. No need for this one I think. > - implement clean shutdown. > - implement non-blocking writes to UDP backend. Not required for merge. > - handle correctly blocking if there are no available > descriptors on RX virtqueue. > - implement polling strategy. Not required for merge. > --- > tests/vhost-user-bridge.c | 1138 > +++++++++++++++++++++++++++++++++++++++++++++ > tests/Makefile | 1 + > 2 files changed, 1139 insertions(+) > create mode 100644 tests/vhost-user-bridge.c > > diff --git a/tests/vhost-user-bridge.c b/tests/vhost-user-bridge.c > new file mode 100644 > index 0000000..89f1e07 > --- /dev/null > +++ b/tests/vhost-user-bridge.c > @@ -0,0 +1,1138 @@ > +/* > + * Vhost-User Bridge > + * > + * Copyright (c) 2015 Red Hat, Inc. > + * > + * Authors: > + * Victor Kaplansky <vict...@redhat.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#include <stddef.h> > +#include <assert.h> > +#include <stdio.h> > +#include <stdlib.h> > +#include <stdint.h> > +#include <inttypes.h> > +#include <string.h> > +#include <unistd.h> > +#include <errno.h> > +#include <sys/types.h> > +#include <sys/socket.h> > +#include <sys/un.h> > +#include <sys/unistd.h> > +#include <sys/mman.h> > +#include <sys/eventfd.h> > +#include <arpa/inet.h> > + > +#include <linux/vhost.h> > + > +#include "qemu/atomic.h" > +#include "standard-headers/linux/virtio_net.h" > +#include "standard-headers/linux/virtio_ring.h" > + > +#define DEBUG_VHOST_USER_BRIDGE > + > +typedef void (*CallbackFunc)(int sock, void *ctx); > + > +typedef struct Event { > + void *ctx; > + CallbackFunc callback; > +} Event; > + > +typedef struct Dispatcher { > + int max_sock; > + fd_set fdset; > + Event events[FD_SETSIZE]; > +} Dispatcher; > + > +static void > +vhost_user_die(const char *s) > +{ > + perror(s); > + exit(1); > +} > + > +static int > +dispatcher_init(Dispatcher *dispr) > +{ > + FD_ZERO(&dispr->fdset); > + dispr->max_sock = -1; > + return 0; > +} > + > +static int > +dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb) > +{ > + if (sock >= FD_SETSIZE) { > + fprintf(stderr, > + "Error: Failed to add new event. sock %d should be less than > %d\n", > + sock, FD_SETSIZE); > + return -1; > + } > + > + dispr->events[sock].ctx = ctx; > + dispr->events[sock].callback = cb; > + > + FD_SET(sock, &dispr->fdset); > + if (sock > dispr->max_sock) { > + dispr->max_sock = sock; > + } > + printf("Added sock %d for watching. max_sock: %d\n", > + sock, dispr->max_sock); > + return 0; > +} > + > +#if 0 > +/* dispatcher_remove() is not currently in use but may be useful > + * in the future. */ > +static int > +dispatcher_remove(Dispatcher *dispr, int sock) > +{ > + if (sock >= FD_SETSIZE) { > + fprintf(stderr, > + "Error: Failed to remove event. sock %d should be less than > %d\n", > + sock, FD_SETSIZE); > + return -1; > + } > + > + FD_CLR(sock, &dispr->fdset); > + return 0; > +} > +#endif > + > +/* timeout in us */ > +static int > +dispatcher_wait(Dispatcher *dispr, uint32_t timeout) > +{ > + struct timeval tv; > + tv.tv_sec = timeout / 1000000; > + tv.tv_usec = timeout % 1000000; > + > + fd_set fdset = dispr->fdset; > + > + /* wait until some of sockets become readable. */ > + int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv); > + > + if (rc == -1) { > + vhost_user_die("select"); > + } > + > + /* Timeout */ > + if (rc == 0) { > + return 0; > + } > + > + /* Now call callback for every ready socket. */ > + > + int sock; > + for (sock = 0; sock < dispr->max_sock + 1; sock++) > + if (FD_ISSET(sock, &fdset)) { > + Event *e = &dispr->events[sock]; > + e->callback(sock, e->ctx); > + } > + > + return 0; > +} > + > +typedef struct VirtQueue { > + int call_fd; > + int kick_fd; > + uint32_t size; > + uint16_t last_avail_index; > + uint16_t last_used_index; > + struct vring_desc *desc; > + struct vring_avail *avail; > + struct vring_used *used; > +} VirtQueue; > + > +/* Based on qemu/hw/virtio/vhost-user.c */ > + > +#define VHOST_MEMORY_MAX_NREGIONS 8 > +#define VHOST_USER_F_PROTOCOL_FEATURES 30 > + > +enum VhostUserProtocolFeature { > + VHOST_USER_PROTOCOL_F_MQ = 0, > + VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1, > + VHOST_USER_PROTOCOL_F_RARP = 2, > + > + VHOST_USER_PROTOCOL_F_MAX > +}; > + > +#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - > 1) > + > +typedef enum VhostUserRequest { > + VHOST_USER_NONE = 0, > + VHOST_USER_GET_FEATURES = 1, > + VHOST_USER_SET_FEATURES = 2, > + VHOST_USER_SET_OWNER = 3, > + VHOST_USER_RESET_DEVICE = 4, > + VHOST_USER_SET_MEM_TABLE = 5, > + VHOST_USER_SET_LOG_BASE = 6, > + VHOST_USER_SET_LOG_FD = 7, > + VHOST_USER_SET_VRING_NUM = 8, > + VHOST_USER_SET_VRING_ADDR = 9, > + VHOST_USER_SET_VRING_BASE = 10, > + VHOST_USER_GET_VRING_BASE = 11, > + VHOST_USER_SET_VRING_KICK = 12, > + VHOST_USER_SET_VRING_CALL = 13, > + VHOST_USER_SET_VRING_ERR = 14, > + VHOST_USER_GET_PROTOCOL_FEATURES = 15, > + VHOST_USER_SET_PROTOCOL_FEATURES = 16, > + VHOST_USER_GET_QUEUE_NUM = 17, > + VHOST_USER_SET_VRING_ENABLE = 18, > + VHOST_USER_SEND_RARP = 19, > + VHOST_USER_MAX > +} VhostUserRequest; Maybe we need a common copy under tests/ > + > +typedef struct VhostUserMemoryRegion { > + uint64_t guest_phys_addr; > + uint64_t memory_size; > + uint64_t userspace_addr; > + uint64_t mmap_offset; > +} VhostUserMemoryRegion; > + > +typedef struct VhostUserMemory { > + uint32_t nregions; > + uint32_t padding; > + VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS]; > +} VhostUserMemory; > + > +typedef struct VhostUserMsg { > + VhostUserRequest request; > + > +#define VHOST_USER_VERSION_MASK (0x3) > +#define VHOST_USER_REPLY_MASK (0x1<<2) > + uint32_t flags; > + uint32_t size; /* the following payload size */ > + union { > +#define VHOST_USER_VRING_IDX_MASK (0xff) > +#define VHOST_USER_VRING_NOFD_MASK (0x1<<8) > + uint64_t u64; > + struct vhost_vring_state state; > + struct vhost_vring_addr addr; > + VhostUserMemory memory; > + } payload; > + int fds[VHOST_MEMORY_MAX_NREGIONS]; > + int fd_num; > +} QEMU_PACKED VhostUserMsg; > + > +#define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64) > + > +/* The version of the protocol we support */ > +#define VHOST_USER_VERSION (0x1) > + > +#define MAX_NR_VIRTQUEUE (8) > + > +typedef struct VhostDevRegion { > + /* Guest Phhysical address. */ > + uint64_t gpa; > + /* Memory region size. */ > + uint64_t size; > + /* QEMU virtual address (userspace). */ > + uint64_t qva; > + /* Starting offset in our mmaped space. */ > + uint64_t mmap_offset; > + /* Start addrtess of mmaped space. */ > + uint64_t mmap_addr; > +} VhostDevRegion; > + > +typedef struct VhostDev { > + int sock; > + Dispatcher dispatcher; > + uint32_t nregions; > + VhostDevRegion regions[VHOST_MEMORY_MAX_NREGIONS]; > + VirtQueue virtqueue[MAX_NR_VIRTQUEUE]; > + int backend_udp_sock; > + struct sockaddr_in backend_udp_dest; > +} VhostDev; > + > +static const char *vhost_user_request_str[] = { > + [VHOST_USER_NONE] = "VHOST_USER_NONE", > + [VHOST_USER_GET_FEATURES] = "VHOST_USER_GET_FEATURES", > + [VHOST_USER_SET_FEATURES] = "VHOST_USER_SET_FEATURES", > + [VHOST_USER_SET_OWNER] = "VHOST_USER_SET_OWNER", > + [VHOST_USER_RESET_DEVICE] = "VHOST_USER_RESET_DEVICE", > + [VHOST_USER_SET_MEM_TABLE] = "VHOST_USER_SET_MEM_TABLE", > + [VHOST_USER_SET_LOG_BASE] = "VHOST_USER_SET_LOG_BASE", > + [VHOST_USER_SET_LOG_FD] = "VHOST_USER_SET_LOG_FD", > + [VHOST_USER_SET_VRING_NUM] = "VHOST_USER_SET_VRING_NUM", > + [VHOST_USER_SET_VRING_ADDR] = "VHOST_USER_SET_VRING_ADDR", > + [VHOST_USER_SET_VRING_BASE] = "VHOST_USER_SET_VRING_BASE", > + [VHOST_USER_GET_VRING_BASE] = "VHOST_USER_GET_VRING_BASE", > + [VHOST_USER_SET_VRING_KICK] = "VHOST_USER_SET_VRING_KICK", > + [VHOST_USER_SET_VRING_CALL] = "VHOST_USER_SET_VRING_CALL", > + [VHOST_USER_SET_VRING_ERR] = "VHOST_USER_SET_VRING_ERR", > + [VHOST_USER_GET_PROTOCOL_FEATURES] = > "VHOST_USER_GET_PROTOCOL_FEATURES", > + [VHOST_USER_SET_PROTOCOL_FEATURES] = > "VHOST_USER_SET_PROTOCOL_FEATURES", > + [VHOST_USER_GET_QUEUE_NUM] = "VHOST_USER_GET_QUEUE_NUM", > + [VHOST_USER_SET_VRING_ENABLE] = "VHOST_USER_SET_VRING_ENABLE", > + [VHOST_USER_SEND_RARP] = "VHOST_USER_SEND_RARP", > + [VHOST_USER_MAX] = "VHOST_USER_MAX", > +}; > + > +static void > +print_buffer(uint8_t *buf, size_t len) > +{ > + int i; > + printf("Raw buffer:\n"); > + for (i = 0; i < len; i++) { > + if (i % 16 == 0) { > + printf("\n"); > + } > + if (i % 4 == 0) { > + printf(" "); > + } > + printf("%02x ", buf[i]); > + } > + > printf("\n............................................................\n"); > +} > + > +/* Translate guest physical address to our virtual address. */ > +static uint64_t > +gpa_to_va(VhostDev *dev, uint64_t guest_addr) > +{ > + int i; > + > + /* Find matching memory region. */ > + for (i = 0; i < dev->nregions; i++) { > + VhostDevRegion *r = &dev->regions[i]; > + > + if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) { > + return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset; > + } > + } > + > + assert(!"address not found in regions"); > + return 0; > +} > + > +/* Translate qemu virtual address to our virtual address. */ > +static uint64_t > +qva_to_va(VhostDev *dev, uint64_t qemu_addr) > +{ > + int i; > + > + /* Find matching memory region. */ > + for (i = 0; i < dev->nregions; i++) { > + VhostDevRegion *r = &dev->regions[i]; > + > + if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) { > + return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset; > + } > + } > + > + assert(!"address not found in regions"); > + return 0; > +} > + > +static void > +vhost_user_message_read(int conn_fd, VhostUserMsg *vmsg) > +{ > + char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { }; > + struct iovec iov = { > + .iov_base = (char *)vmsg, > + .iov_len = VHOST_USER_HDR_SIZE, > + }; > + struct msghdr msg = { > + .msg_iov = &iov, > + .msg_iovlen = 1, > + .msg_control = control, > + .msg_controllen = sizeof(control), > + }; > + size_t fd_size; > + struct cmsghdr *cmsg; > + int rc; > + > + rc = recvmsg(conn_fd, &msg, 0); > + > + if (rc <= 0) { > + vhost_user_die("recvmsg"); > + } > + > + vmsg->fd_num = 0; > + for (cmsg = CMSG_FIRSTHDR(&msg); > + cmsg != NULL; > + cmsg = CMSG_NXTHDR(&msg, cmsg)) > + { > + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) > { > + fd_size = cmsg->cmsg_len - CMSG_LEN(0); > + vmsg->fd_num = fd_size / sizeof(int); > + memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size); > + break; > + } > + } > + > + if (vmsg->size > sizeof(vmsg->payload)) { > + fprintf(stderr, > + "Error: too big message request: %d, size: vmsg->size: %u, " > + "while sizeof(vmsg->payload) = %lu\n", > + vmsg->request, vmsg->size, sizeof(vmsg->payload)); > + exit(1); > + } > + > + if (vmsg->size) { > + rc = read(conn_fd, &vmsg->payload, vmsg->size); > + if (rc <= 0) { > + vhost_user_die("recvmsg"); > + } > + > + assert(rc == vmsg->size); > + } > +} > + > +static void > +vhost_user_message_write(int conn_fd, VhostUserMsg *vmsg) > +{ > + int rc; > + do { > + rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size); > + } while (rc < 0 && errno == EINTR); > + > + if (rc < 0) { > + vhost_user_die("write"); > + } > +} > + > +static void > +vhost_user_backend_udp_sendbuf(VhostDev *dev, > + uint8_t *buf, > + size_t len) > +{ > + int slen = sizeof(struct sockaddr_in); > + > + if (sendto(dev->backend_udp_sock, buf, len, 0, > + (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) { > + vhost_user_die("sendto()"); > + } > +} > + > +static int > +vhost_user_backend_udp_recvbuf(VhostDev *dev, > + uint8_t *buf, > + size_t buflen) > +{ > + int slen = sizeof(struct sockaddr_in); > + int rc; > + > + rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0, > + (struct sockaddr *) &dev->backend_udp_dest, > + (socklen_t *)&slen); > + if (rc == -1) { > + vhost_user_die("recvfrom()"); > + } > + > + return rc; > +} > + > +static void > +vubr_consume_raw_packet(VhostDev *dev, uint8_t *buf, uint32_t len) > +{ > + int hdrlen = sizeof(struct virtio_net_hdr_v1); > + > +#ifdef DEBUG_VHOST_USER_BRIDGE > + print_buffer(buf, len); > +#endif > + vhost_user_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen); > +} > + > +/* Kick the guest if necessary. */ > +static void > +virtqueue_kick(VirtQueue *vq) > +{ > + if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { > + printf("Kicking the guest...\n"); > + eventfd_write(vq->call_fd, 1); > + } > +} > + > +static void > +vubr_post_buffer(VhostDev *dev, > + VirtQueue *vq, > + uint8_t *buf, > + int32_t len) > +{ > + struct vring_desc *desc = vq->desc; > + struct vring_avail *avail = vq->avail; > + struct vring_used *used = vq->used; > + > + unsigned int size = vq->size; > + > + assert(vq->last_avail_index != avail->idx); Why? How do you know there's anything there? > + /* Prevent accessing descriptors, buffers, avail->ring and used before > + * avail->idx */ smp_rmb then? Can be fixed later ... > + smp_mb(); > + > + uint16_t a_index = vq->last_avail_index % size; > + uint16_t u_index = vq->last_used_index % size; > + uint16_t d_index = avail->ring[a_index]; > + > + int i = d_index; > + > + > +#ifdef DEBUG_VHOST_USER_BRIDGE > + printf("Post packet to guest on vq:\n"); > + printf(" size = %d\n", vq->size); > + printf(" last_avail_index = %d\n", vq->last_avail_index); > + printf(" last_used_index = %d\n", vq->last_used_index); > + printf(" a_index = %d\n", a_index); > + printf(" u_index = %d\n", u_index); > + printf(" d_index = %d\n", d_index); > + printf(" desc[%d].addr = 0x%016"PRIx64"\n", i, desc[i].addr); > + printf(" desc[%d].len = %d\n", i, desc[i].len); > + printf(" desc[%d].flags = %d\n", i, desc[i].flags); > + printf(" avail->idx = %d\n", avail->idx); > + printf(" used->idx = %d\n", used->idx); > +#endif > + > + if (!(desc[i].flags & VRING_DESC_F_WRITE)) { > + /* FIXME: we should find writable descriptor. */ > + fprintf(stderr, "Error: descriptor is not writable. Exiting.\n"); > + exit(1); > + } > + > + void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr); > + uint32_t chunk_len = desc[i].len; > + > + if (len <= chunk_len) { > + memcpy(chunk_start, buf, len); > + } else { > + fprintf(stderr, > + "Received too long packet from the backend. Dropping...\n"); > + return; > + } > + > + /* Add descriptor to the used ring. */ > + used->ring[u_index].id = d_index; > + used->ring[u_index].len = len; > + > + vq->last_avail_index++; > + vq->last_used_index++; > + > + /* Prevent accessing avail, descriptors, buffers and used->ring after > + * the store to used->idx */ > + smp_mb(); > + used->idx = vq->last_used_index; > + > + /* Kick the guest if necessary. */ > + virtqueue_kick(vq); > +} > + > +static int > +vubr_process_desc(VhostDev *dev, VirtQueue *vq) > +{ > + struct vring_desc *desc = vq->desc; > + struct vring_avail *avail = vq->avail; > + struct vring_used *used = vq->used; > + > + unsigned int size = vq->size; > + > + uint16_t a_index = vq->last_avail_index % size; > + uint16_t u_index = vq->last_used_index % size; > + uint16_t d_index = avail->ring[a_index]; > + > + uint32_t i, len = 0; > + size_t buf_size = 4096; > + uint8_t buf[4096]; > + > +#ifdef DEBUG_VHOST_USER_BRIDGE > + printf("Chunks: "); > +#endif > + > + i = d_index; > + do { > + void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr); > + uint32_t chunk_len = desc[i].len; > + > + if (len + chunk_len < buf_size) { > + memcpy(buf + len, chunk_start, chunk_len); > +#ifdef DEBUG_VHOST_USER_BRIDGE > + printf("%d ", chunk_len); > +#endif Wrap these in a macro so you don't need ifdefs in code. > + } else { > + fprintf(stderr, "Error: too long packet. Dropping...\n"); > + break; > + } > + > + len += chunk_len; > + > + if (!(desc[i].flags & VRING_DESC_F_NEXT)) { > + break; > + } > + > + i = desc[i].next; > + } while (1); > + > + if (!len) { > + return -1; > + } > + > + /* Add descriptor to the used ring. */ > + used->ring[u_index].id = d_index; > + used->ring[u_index].len = len; > + > +#ifdef DEBUG_VHOST_USER_BRIDGE > + printf("\n"); > +#endif > + > + vubr_consume_raw_packet(dev, buf, len); > + > + return 0; > +} > + > +static void > +vubr_process_avail(VhostDev *dev, VirtQueue *vq) > +{ > + struct vring_avail *avail = vq->avail; > + struct vring_used *used = vq->used; > + > + while (vq->last_avail_index != avail->idx) { > + /* Prevent accessing avail->ring, descriptors and buffers before > + * avail->idx */ > + smp_mb(); > + > + vubr_process_desc(dev, vq); > + vq->last_avail_index++; > + vq->last_used_index++; > + } > + > + /* Prevent accessing avail->ring, descriptors, buffers and used->ring, > + * after user->idx */ > + smp_mb(); > + > + used->idx = vq->last_used_index; > +} > + > +static void > +vubr_backend_recv_cb(int sock, void *ctx) > +{ > + VhostDev *dev = (VhostDev *) ctx; > + VirtQueue *rx_vq = &dev->virtqueue[0]; > + uint8_t buf[4096]; > + struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf; > + int hdrlen = sizeof(struct virtio_net_hdr_v1); > + int buflen = sizeof(buf); > + int len; > + > +#ifdef DEBUG_VHOST_USER_BRIDGE > + printf("\n\n *** IN UDP RECEIVE CALLBACK ***\n\n"); > +#endif > + > + *hdr = (struct virtio_net_hdr_v1) { }; > + hdr->num_buffers = 1; > + > + len = vhost_user_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen); > + vubr_post_buffer(dev, rx_vq, buf, len + hdrlen); > +} > + > +static void > +vubr_kick_cb(int sock, void *ctx) > +{ > + VhostDev *dev = (VhostDev *) ctx; > + eventfd_t kick_data; > + ssize_t rc; > + > + rc = eventfd_read(sock, &kick_data); > + > + if (rc == -1) { > + vhost_user_die("eventfd_read()"); > + } else { > + printf("Got kick_data: %016"PRIx64"\n", kick_data); > + vubr_process_avail(dev, &dev->virtqueue[1]); > + } > +} > + > +static int > +vhost_user_none_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_get_features_exec(VhostDev *dev, > + VhostUserMsg *vmsg) Please prefix everything with vubr_ consistently. Same applies to types etc. > +{ > + vmsg->payload.u64 = > + ((1ULL << VIRTIO_NET_F_MRG_RXBUF) | > + (1ULL << VIRTIO_NET_F_CTRL_VQ) | > + (1ULL << VIRTIO_NET_F_CTRL_RX) | > + (1ULL << VHOST_F_LOG_ALL)); > + vmsg->size = sizeof(vmsg->payload.u64); > + > + printf("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + > + /* reply */ > + return 1; > +} > + > +static int > +vhost_user_set_features_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + return 0; > +} > + > +static int > +vhost_user_set_owner_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + return 0; > +} > + > +static int > +vhost_user_reset_device_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_set_mem_table_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Nregions: %d\n", vmsg->payload.memory.nregions); > + > + VhostUserMemory *memory = &vmsg->payload.memory; > + dev->nregions = memory->nregions; > + int i; > + for (i = 0; i < dev->nregions; i++) { > + VhostUserMemoryRegion *msg_region = &memory->regions[i]; > + VhostDevRegion *dev_region = &dev->regions[i]; > + > + printf("Region %d\n", i); > + printf(" guest_phys_addr: 0x%016"PRIx64"\n", > + msg_region->guest_phys_addr); > + printf(" memory_size: 0x%016"PRIx64"\n", > + msg_region->memory_size); > + printf(" userspace_addr 0x%016"PRIx64"\n", > + msg_region->userspace_addr); > + printf(" mmap_offset 0x%016"PRIx64"\n", > + msg_region->mmap_offset); > + > + dev_region->gpa = msg_region->guest_phys_addr; > + dev_region->size = msg_region->memory_size; > + dev_region->qva = msg_region->userspace_addr; > + dev_region->mmap_offset = msg_region->mmap_offset; > + > + void *mmap_addr; > + > + /* We don't use offset argument of mmap() since the > + * mapped address has to be page aligned, and we use huge > + * pages. */ > + mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset, > + PROT_READ | PROT_WRITE, MAP_SHARED, > + vmsg->fds[i], 0); > + > + if (mmap_addr == MAP_FAILED) { > + vhost_user_die("mmap"); > + } > + > + dev_region->mmap_addr = (uint64_t) mmap_addr; > + printf(" mmap_addr: 0x%016"PRIx64"\n", > dev_region->mmap_addr); > + } > + > + return 0; > +} > + > +static int > +vhost_user_set_log_base_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_set_log_fd_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_set_vring_num_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + unsigned int index = vmsg->payload.state.index; > + unsigned int num = vmsg->payload.state.num; > + > + printf("State.index: %d\n", index); > + printf("State.num: %d\n", num); > + dev->virtqueue[index].size = num; > + return 0; > +} > + > +static int > +vhost_user_set_vring_addr_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + struct vhost_vring_addr *vra = &vmsg->payload.addr; > + printf("vhost_vring_addr:\n"); > + printf(" index: %d\n", vra->index); > + printf(" flags: %d\n", vra->flags); > + printf(" desc_user_addr: 0x%016llx\n", vra->desc_user_addr); > + printf(" used_user_addr: 0x%016llx\n", vra->used_user_addr); > + printf(" avail_user_addr: 0x%016llx\n", vra->avail_user_addr); > + printf(" log_guest_addr: 0x%016llx\n", vra->log_guest_addr); > + > + unsigned int index = vra->index; > + VirtQueue *vq = &dev->virtqueue[index]; > + > + vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr); > + vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr); > + vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr); > + > + printf("Setting virtq addresses:\n"); > + printf(" vring_desc at %p\n", vq->desc); > + printf(" vring_used at %p\n", vq->used); > + printf(" vring_avail at %p\n", vq->avail); > + > + vq->last_used_index = vq->used->idx; > + return 0; > +} > + > +static int > +vhost_user_set_vring_base_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + unsigned int index = vmsg->payload.state.index; > + unsigned int num = vmsg->payload.state.num; > + > + printf("State.index: %d\n", index); > + printf("State.num: %d\n", num); > + dev->virtqueue[index].last_avail_index = num; > + > + return 0; > +} > + > +static int > +vhost_user_get_vring_base_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_set_vring_kick_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + uint64_t u64_arg = vmsg->payload.u64; > + int index = u64_arg & VHOST_USER_VRING_IDX_MASK; > + > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + > + assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); > + assert(vmsg->fd_num == 1); > + > + dev->virtqueue[index].kick_fd = vmsg->fds[0]; > + printf("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index); > + > + if (index % 2 == 1) { > + /* TX queue. */ > + dispatcher_add(&dev->dispatcher, dev->virtqueue[index].kick_fd, > + dev, vubr_kick_cb); > + > + printf("Waiting for kicks on fd: %d for vq: %d\n", > + dev->virtqueue[index].kick_fd, index); > + } > + return 0; > +} > + > +static int > +vhost_user_set_vring_call_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + uint64_t u64_arg = vmsg->payload.u64; > + int index = u64_arg & VHOST_USER_VRING_IDX_MASK; > + > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0); > + assert(vmsg->fd_num == 1); > + > + dev->virtqueue[index].call_fd = vmsg->fds[0]; > + printf("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index); > + > + return 0; > +} > + > +static int > +vhost_user_set_vring_err_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + return 0; > +} > + > +static int > +vhost_user_get_protocol_features_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + /* FIXME: unimplented */ > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + return 0; > +} > + > +static int > +vhost_user_set_protocol_features_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + /* FIXME: unimplented */ > + printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + return 0; > +} > + > +static int > +vhost_user_get_queue_num_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_set_vring_enable_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_send_rarp_exec(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + printf("Function %s() not implemented yet.\n", __func__); > + return 0; > +} > + > +static int > +vhost_user_execute_request(VhostDev *dev, > + VhostUserMsg *vmsg) > +{ > + /* Print out generic part of the request. */ > + printf( > + "================== Vhost user message from QEMU > ==================\n"); > + printf("Request: %s (%d)\n", vhost_user_request_str[vmsg->request], > + vmsg->request); > + printf("Flags: 0x%x\n", vmsg->flags); > + printf("Size: %d\n", vmsg->size); > + > + if (vmsg->fd_num) { > + int i; > + printf("Fds:"); > + for (i = 0; i < vmsg->fd_num; i++) { > + printf(" %d", vmsg->fds[i]); > + } > + printf("\n"); > + } > + > + switch (vmsg->request) { > + case VHOST_USER_NONE: > + return vhost_user_none_exec(dev, vmsg); > + case VHOST_USER_GET_FEATURES: > + return vhost_user_get_features_exec(dev, vmsg); > + case VHOST_USER_SET_FEATURES: > + return vhost_user_set_features_exec(dev, vmsg); > + case VHOST_USER_SET_OWNER: > + return vhost_user_set_owner_exec(dev, vmsg); > + case VHOST_USER_RESET_DEVICE: > + return vhost_user_reset_device_exec(dev, vmsg); > + case VHOST_USER_SET_MEM_TABLE: > + return vhost_user_set_mem_table_exec(dev, vmsg); > + case VHOST_USER_SET_LOG_BASE: > + return vhost_user_set_log_base_exec(dev, vmsg); > + case VHOST_USER_SET_LOG_FD: > + return vhost_user_set_log_fd_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_NUM: > + return vhost_user_set_vring_num_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_ADDR: > + return vhost_user_set_vring_addr_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_BASE: > + return vhost_user_set_vring_base_exec(dev, vmsg); > + case VHOST_USER_GET_VRING_BASE: > + return vhost_user_get_vring_base_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_KICK: > + return vhost_user_set_vring_kick_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_CALL: > + return vhost_user_set_vring_call_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_ERR: > + return vhost_user_set_vring_err_exec(dev, vmsg); > + case VHOST_USER_GET_PROTOCOL_FEATURES: > + return vhost_user_get_protocol_features_exec(dev, vmsg); > + case VHOST_USER_SET_PROTOCOL_FEATURES: > + return vhost_user_set_protocol_features_exec(dev, vmsg); > + case VHOST_USER_GET_QUEUE_NUM: > + return vhost_user_get_queue_num_exec(dev, vmsg); > + case VHOST_USER_SET_VRING_ENABLE: > + return vhost_user_set_vring_enable_exec(dev, vmsg); > + case VHOST_USER_SEND_RARP: > + return vhost_user_send_rarp_exec(dev, vmsg); > + > + > + case VHOST_USER_MAX: > + assert(vmsg->request != VHOST_USER_MAX); > + } > + return 0; > +} > + > +static void > +vhost_user_receive_cb(int sock, void *ctx) > +{ > + VhostDev *dev = (VhostDev *) ctx; > + VhostUserMsg vmsg; > + > + vhost_user_message_read(sock, &vmsg); > + > + int reply_requested = vhost_user_execute_request(dev, &vmsg); > + > + if (reply_requested) { > + /* Set the version in the flags when sending the reply */ > + vmsg.flags &= ~VHOST_USER_VERSION_MASK; > + vmsg.flags |= VHOST_USER_VERSION; > + vmsg.flags |= VHOST_USER_REPLY_MASK; > + vhost_user_message_write(sock, &vmsg); > + } > +} > + > +static void > +vhost_user_accept_cb(int sock, void *ctx) > +{ > + VhostDev *dev = (VhostDev *)ctx; > + int conn_fd; > + struct sockaddr_un un; > + socklen_t len = sizeof(un); > + > + conn_fd = accept(sock, (struct sockaddr *) &un, &len); > + if (conn_fd == -1) { > + vhost_user_die("accept()"); > + } > + printf("Got connection from remote peer on sock %d\n", conn_fd); > + dispatcher_add(&dev->dispatcher, conn_fd, ctx, vhost_user_receive_cb); > +} > + > +static VhostDev * > +vhost_user_new(const char *path) > +{ > + VhostDev *dev = > + (VhostDev *) calloc(1, sizeof(VhostDev)); > + > + dev->nregions = 0; > + > + int i; > + for (i = 0; i < MAX_NR_VIRTQUEUE; i++) { > + dev->virtqueue[i] = (VirtQueue) { > + .call_fd = -1, .kick_fd = -1, > + .size = 0, > + .last_avail_index = 0, .last_used_index = 0, > + .desc = 0, .avail = 0, .used = 0, > + }; > + } > + > + /* Get a UNIX socket. */ > + dev->sock = socket(AF_UNIX, SOCK_STREAM, 0); > + if (dev->sock == -1) { > + vhost_user_die("socket"); > + } > + > + struct sockaddr_un un; > + un.sun_family = AF_UNIX; > + strcpy(un.sun_path, path); > + > + size_t len = sizeof(un.sun_family) + strlen(path); > + > + unlink(path); > + > + if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) { > + vhost_user_die("bind"); > + } > + > + if (listen(dev->sock, 1) == -1) { > + vhost_user_die("listen"); > + } > + > + dispatcher_init(&dev->dispatcher); > + dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev, > + vhost_user_accept_cb); > + > + printf("Waiting for connections on UNIX socket %s ...\n", path); > + return dev; > +} > + > +static void > +vhost_user_backend_udp_setup(VhostDev *dev, > + const char *local_host, > + uint16_t local_port, > + const char *dest_host, > + uint16_t dest_port) > +{ > + > + struct sockaddr_in si_local; > + int sock; > + > + sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); > + if (sock == -1) { > + vhost_user_die("socket"); > + } > + > + memset((char *) &si_local, 0, sizeof(struct sockaddr_in)); > + si_local.sin_family = AF_INET; > + si_local.sin_port = htons(local_port); > + if (inet_aton(local_host, &si_local.sin_addr) == 0) { > + fprintf(stderr, "inet_aton() failed.\n"); > + exit(1); > + } > + > + if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) { > + vhost_user_die("bind"); > + } > + > + /* setup destination for sends */ > + struct sockaddr_in *si_remote = &dev->backend_udp_dest; > + memset((char *) si_remote, 0, sizeof(struct sockaddr_in)); > + si_remote->sin_family = AF_INET; > + si_remote->sin_port = htons(dest_port); > + if (inet_aton(dest_host, &si_remote->sin_addr) == 0) { > + fprintf(stderr, "inet_aton() failed.\n"); > + exit(1); > + } > + > + dev->backend_udp_sock = sock; > + dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb); > + printf("Waiting for data from udp backend on %s:%d...\n", > + local_host, local_port); > +} > + > +static void > +vhost_user_run(VhostDev *dev) > +{ > + while (1) { > + /* timeout 200ms */ > + dispatcher_wait(&dev->dispatcher, 200000); > + /* Here one can try polling strategy. */ > + } > +} > + > +int > +main(int argc, char *argv[]) > +{ > + VhostDev *dev; > + > + dev = vhost_user_new("/tmp/vubr.sock"); > + if (!dev) { > + return 1; > + } > + > + vhost_user_backend_udp_setup(dev, > + "127.0.0.1", 4444, > + "127.0.0.1", 5555); > + vhost_user_run(dev); > + return 0; > +} > diff --git a/tests/Makefile b/tests/Makefile > index 9341498..0811c68 100644 > --- a/tests/Makefile > +++ b/tests/Makefile > @@ -522,6 +522,7 @@ tests/qemu-iotests/socket_scm_helper$(EXESUF): > tests/qemu-iotests/socket_scm_hel > tests/test-qemu-opts$(EXESUF): tests/test-qemu-opts.o $(test-util-obj-y) > tests/test-write-threshold$(EXESUF): tests/test-write-threshold.o > $(test-block-obj-y) > tests/test-netfilter$(EXESUF): tests/test-netfilter.o $(qtest-obj-y) > +tests/vhost-user-bridge$(EXESUF): tests/vhost-user-bridge.o Needs to be limited to when there's actual vhost-user support. Same as existing vhost-user-test I guess. > > ifeq ($(CONFIG_POSIX),y) > LIBS += -lutil > -- > --Victor