On Tue, Nov 17, 2015 at 12:04:06PM +0200, Victor Kaplansky wrote: > During migration devices continue writing to the guest's memory. > The writes has to be reported to QEMU. This change implements > minimal support in vhost-user-bridge required for successful > migration of a guest with virtio-net device. > > Signed-off-by: Victor Kaplansky <vict...@redhat.com> > --- > v4: > - implement set_vring_enable() > - stop the device on vubr_get_vring_base_exec() and > start on setting kick_fd as a work-around to QEMU bug > enabling vrings too early.
You do want to address that FIXME but at least it's a documented bug. > v3: > - Get rid of vhost_log_chunk_t. Just use uint8_t. > - Implement vubr_set_log_fd_exec(). > - Kick the log if log_call_fd has been set up. > - Mark bits in log table atomically to enable more then one > simultaneous vhost-user backend. > - Fix the calculations of required log table size in an > assert. > - Fix the coding style: only single space before assignment > operator. > - Add a comment on the hack to determine that queues are > ready for processing. > - Other minor cosmetic fixes. > v2: > - use log_guest_addr for used ring reported by qemu instead of > translating. > - use mmap_size and mmap_offset defined in new > VHOST_USER_SET_LOG_BASE interface. See the patch > "vhost-user: modify SET_LOG_BASE to pass mmap size and > offset". > - start logging dirty pages only after the appropriate feature > is set by a VHOST_USER_GET_PROTOCOL_FEATURES request. > - updated TODO list. > > tests/vhost-user-bridge.c | 220 > ++++++++++++++++++++++++++++++++++++++++------ > 1 file changed, 195 insertions(+), 25 deletions(-) > > diff --git a/tests/vhost-user-bridge.c b/tests/vhost-user-bridge.c > index 864f69e..7bdfc98 100644 > --- a/tests/vhost-user-bridge.c > +++ b/tests/vhost-user-bridge.c > @@ -13,16 +13,22 @@ > /* > * TODO: > * - main should get parameters from the command line. > - * - implement all request handlers. > + * - implement all request handlers. Still not implemented: > + * vubr_get_queue_num_exec() > + * vubr_send_rarp_exec() > * - test for broken requests and virtqueue. > * - implement features defined by Virtio 1.0 spec. > * - support mergeable buffers and indirect descriptors. > - * - implement RESET_DEVICE request. > * - implement clean shutdown. > * - implement non-blocking writes to UDP backend. > * - implement polling strategy. > + * - implement clean starting/stopping of vq processing > + * - implement clean starting/stopping of used and buffers > + * dirty page logging. > */ > > +#define _FILE_OFFSET_BITS 64 > + > #include <stddef.h> > #include <assert.h> > #include <stdio.h> > @@ -166,6 +172,8 @@ typedef struct VubrVirtq { > struct vring_desc *desc; > struct vring_avail *avail; > struct vring_used *used; > + uint64_t log_guest_addr; > + int enable; > } VubrVirtq; > > /* Based on qemu/hw/virtio/vhost-user.c */ > @@ -173,6 +181,8 @@ typedef struct VubrVirtq { > #define VHOST_MEMORY_MAX_NREGIONS 8 > #define VHOST_USER_F_PROTOCOL_FEATURES 30 > > +#define VHOST_LOG_PAGE 4096 > + > enum VhostUserProtocolFeature { > VHOST_USER_PROTOCOL_F_MQ = 0, > VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1, > @@ -220,6 +230,11 @@ typedef struct VhostUserMemory { > VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS]; > } VhostUserMemory; > > +typedef struct VhostUserLog { > + uint64_t mmap_size; > + uint64_t mmap_offset; > +} VhostUserLog; > + > typedef struct VhostUserMsg { > VhostUserRequest request; > > @@ -234,6 +249,7 @@ typedef struct VhostUserMsg { > struct vhost_vring_state state; > struct vhost_vring_addr addr; > VhostUserMemory memory; > + VhostUserLog log; > } payload; > int fds[VHOST_MEMORY_MAX_NREGIONS]; > int fd_num; > @@ -265,8 +281,13 @@ typedef struct VubrDev { > uint32_t nregions; > VubrDevRegion regions[VHOST_MEMORY_MAX_NREGIONS]; > VubrVirtq vq[MAX_NR_VIRTQUEUE]; > + int log_call_fd; > + uint64_t log_size; > + uint8_t *log_table; > int backend_udp_sock; > struct sockaddr_in backend_udp_dest; > + int ready; > + uint64_t features; > } VubrDev; > > static const char *vubr_request_str[] = { > @@ -368,7 +389,12 @@ vubr_message_read(int conn_fd, VhostUserMsg *vmsg) > > rc = recvmsg(conn_fd, &msg, 0); > > - if (rc <= 0) { > + if (rc == 0) { > + vubr_die("recvmsg"); > + fprintf(stderr, "Peer disconnected.\n"); > + exit(1); > + } > + if (rc < 0) { > vubr_die("recvmsg"); > } > > @@ -395,7 +421,12 @@ vubr_message_read(int conn_fd, VhostUserMsg *vmsg) > > if (vmsg->size) { > rc = read(conn_fd, &vmsg->payload, vmsg->size); > - if (rc <= 0) { > + if (rc == 0) { > + vubr_die("recvmsg"); > + fprintf(stderr, "Peer disconnected.\n"); > + exit(1); > + } > + if (rc < 0) { > vubr_die("recvmsg"); > } > > @@ -455,6 +486,16 @@ vubr_consume_raw_packet(VubrDev *dev, uint8_t *buf, > uint32_t len) > vubr_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen); > } > > +/* Kick the log_call_fd if required. */ > +static void > +vubr_log_kick(VubrDev *dev) > +{ > + if (dev->log_call_fd != -1) { > + DPRINT("Kicking the QEMU's log...\n"); > + eventfd_write(dev->log_call_fd, 1); > + } > +} > + > /* Kick the guest if necessary. */ > static void > vubr_virtqueue_kick(VubrVirtq *vq) > @@ -466,11 +507,39 @@ vubr_virtqueue_kick(VubrVirtq *vq) > } > > static void > +vubr_log_page(uint8_t *log_table, uint64_t page) > +{ > + DPRINT("Logged dirty guest page: %"PRId64"\n", page); > + atomic_or(&log_table[page / 8], 1 << (page % 8)); One thing you definitely want here is a check against log size. Otherwise it's a security hole: remote can corrupt your memory by making you set random bits outside the mapped buffer. What to do on error? Probably abort. > +} > + > +static void > +vubr_log_write(VubrDev *dev, uint64_t address, uint64_t length) > +{ > + uint64_t page; > + > + if (!(dev->features & (1ULL << VHOST_F_LOG_ALL)) || > + !dev->log_table || !length) { > + return; > + } > + > + assert(dev->log_size > ((address + length - 1) / VHOST_LOG_PAGE / 8)); > + > + page = address / VHOST_LOG_PAGE; > + while (page * VHOST_LOG_PAGE < address + length) { > + vubr_log_page(dev->log_table, page); > + page += VHOST_LOG_PAGE; > + } > + vubr_log_kick(dev); > +} > + > +static void > vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t *buf, int32_t len) > { > - struct vring_desc *desc = vq->desc; > + struct vring_desc *desc = vq->desc; > struct vring_avail *avail = vq->avail; > - struct vring_used *used = vq->used; > + struct vring_used *used = vq->used; > + uint64_t log_guest_addr = vq->log_guest_addr; > > unsigned int size = vq->size; > > @@ -510,6 +579,7 @@ vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t > *buf, int32_t len) > > if (len <= chunk_len) { > memcpy(chunk_start, buf, len); > + vubr_log_write(dev, desc[i].addr, len); > } else { > fprintf(stderr, > "Received too long packet from the backend. Dropping...\n"); > @@ -519,11 +589,17 @@ vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t > *buf, int32_t len) > /* Add descriptor to the used ring. */ > used->ring[u_index].id = d_index; > used->ring[u_index].len = len; > + vubr_log_write(dev, > + log_guest_addr + offsetof(struct vring_used, > ring[u_index]), > + sizeof(used->ring[u_index])); > > vq->last_avail_index++; > vq->last_used_index++; > > atomic_mb_set(&used->idx, vq->last_used_index); > + vubr_log_write(dev, > + log_guest_addr + offsetof(struct vring_used, idx), > + sizeof(used->idx)); > > /* Kick the guest if necessary. */ > vubr_virtqueue_kick(vq); > @@ -532,9 +608,10 @@ vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t > *buf, int32_t len) > static int > vubr_process_desc(VubrDev *dev, VubrVirtq *vq) > { > - struct vring_desc *desc = vq->desc; > + struct vring_desc *desc = vq->desc; > struct vring_avail *avail = vq->avail; > - struct vring_used *used = vq->used; > + struct vring_used *used = vq->used; > + uint64_t log_guest_addr = vq->log_guest_addr; > > unsigned int size = vq->size; > > @@ -552,6 +629,8 @@ vubr_process_desc(VubrDev *dev, VubrVirtq *vq) > void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr); > uint32_t chunk_len = desc[i].len; > > + assert(!(desc[i].flags & VRING_DESC_F_WRITE)); > + > if (len + chunk_len < buf_size) { > memcpy(buf + len, chunk_start, chunk_len); > DPRINT("%d ", chunk_len); > @@ -577,6 +656,9 @@ vubr_process_desc(VubrDev *dev, VubrVirtq *vq) > /* Add descriptor to the used ring. */ > used->ring[u_index].id = d_index; > used->ring[u_index].len = len; > + vubr_log_write(dev, > + log_guest_addr + offsetof(struct vring_used, > ring[u_index]), > + sizeof(used->ring[u_index])); > > vubr_consume_raw_packet(dev, buf, len); > > @@ -588,6 +670,7 @@ vubr_process_avail(VubrDev *dev, VubrVirtq *vq) > { > struct vring_avail *avail = vq->avail; > struct vring_used *used = vq->used; > + uint64_t log_guest_addr = vq->log_guest_addr; > > while (vq->last_avail_index != atomic_mb_read(&avail->idx)) { > vubr_process_desc(dev, vq); > @@ -596,6 +679,9 @@ vubr_process_avail(VubrDev *dev, VubrVirtq *vq) > } > > atomic_mb_set(&used->idx, vq->last_used_index); > + vubr_log_write(dev, > + log_guest_addr + offsetof(struct vring_used, idx), > + sizeof(used->idx)); > } > > static void > @@ -609,6 +695,10 @@ vubr_backend_recv_cb(int sock, void *ctx) > int buflen = sizeof(buf); > int len; > > + if (!dev->ready) { > + return; > + } > + > DPRINT("\n\n *** IN UDP RECEIVE CALLBACK ***\n\n"); > > uint16_t avail_index = atomic_mb_read(&rx_vq->avail->idx); > @@ -656,14 +746,14 @@ vubr_get_features_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > vmsg->payload.u64 = > ((1ULL << VIRTIO_NET_F_MRG_RXBUF) | > - (1ULL << VIRTIO_NET_F_CTRL_VQ) | > - (1ULL << VIRTIO_NET_F_CTRL_RX) | > - (1ULL << VHOST_F_LOG_ALL)); > + (1ULL << VHOST_F_LOG_ALL) | > + (1ULL << VHOST_USER_F_PROTOCOL_FEATURES)); > + > vmsg->size = sizeof(vmsg->payload.u64); > > DPRINT("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > > - /* reply */ > + /* Reply */ > return 1; > } > > @@ -671,6 +761,7 @@ static int > vubr_set_features_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > + dev->features = vmsg->payload.u64; > return 0; > } > > @@ -680,10 +771,28 @@ vubr_set_owner_exec(VubrDev *dev, VhostUserMsg *vmsg) > return 0; > } > > +static void > +vubr_close_log(VubrDev *dev) > +{ > + if (dev->log_table) { > + if (munmap(dev->log_table, dev->log_size) != 0) { > + vubr_die("munmap()"); > + } > + > + dev->log_table = 0; > + } > + if (dev->log_call_fd != -1) { > + close(dev->log_call_fd); > + dev->log_call_fd = -1; > + } > +} > + > static int > vubr_reset_device_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - DPRINT("Function %s() not implemented yet.\n", __func__); > + vubr_close_log(dev); > + dev->ready = 0; > + dev->features = 0; > return 0; > } > > @@ -710,9 +819,9 @@ vubr_set_mem_table_exec(VubrDev *dev, VhostUserMsg *vmsg) > DPRINT(" mmap_offset 0x%016"PRIx64"\n", > msg_region->mmap_offset); > > - dev_region->gpa = msg_region->guest_phys_addr; > - dev_region->size = msg_region->memory_size; > - dev_region->qva = msg_region->userspace_addr; > + dev_region->gpa = msg_region->guest_phys_addr; > + dev_region->size = msg_region->memory_size; > + dev_region->qva = msg_region->userspace_addr; > dev_region->mmap_offset = msg_region->mmap_offset; > > /* We don't use offset argument of mmap() since the > @@ -736,14 +845,38 @@ vubr_set_mem_table_exec(VubrDev *dev, VhostUserMsg > *vmsg) > static int > vubr_set_log_base_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - DPRINT("Function %s() not implemented yet.\n", __func__); > - return 0; > + int fd; > + uint64_t log_mmap_size, log_mmap_offset; > + void *rc; > + > + assert(vmsg->fd_num == 1); > + fd = vmsg->fds[0]; > + > + assert(vmsg->size == sizeof(vmsg->payload.log)); > + log_mmap_offset = vmsg->payload.log.mmap_offset; > + log_mmap_size = vmsg->payload.log.mmap_size; > + DPRINT("Log mmap_offset: %"PRId64"\n", log_mmap_offset); > + DPRINT("Log mmap_size: %"PRId64"\n", log_mmap_size); > + > + rc = mmap(0, log_mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, > + log_mmap_offset); > + if (rc == MAP_FAILED) { > + vubr_die("mmap"); > + } > + dev->log_table = rc; > + dev->log_size = log_mmap_size; > + > + vmsg->size = sizeof(vmsg->payload.u64); > + /* Reply */ > + return 1; > } > > static int > vubr_set_log_fd_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - DPRINT("Function %s() not implemented yet.\n", __func__); > + assert(vmsg->fd_num == 1); > + dev->log_call_fd = vmsg->fds[0]; > + DPRINT("Got log_call_fd: %d\n", vmsg->fds[0]); > return 0; > } > > @@ -777,6 +910,7 @@ vubr_set_vring_addr_exec(VubrDev *dev, VhostUserMsg *vmsg) > vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr); > vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr); > vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr); > + vq->log_guest_addr = vra->log_guest_addr; > > DPRINT("Setting virtq addresses:\n"); > DPRINT(" vring_desc at %p\n", vq->desc); > @@ -803,8 +937,18 @@ vubr_set_vring_base_exec(VubrDev *dev, VhostUserMsg > *vmsg) > static int > vubr_get_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - DPRINT("Function %s() not implemented yet.\n", __func__); > - return 0; > + unsigned int index = vmsg->payload.state.index; > + > + DPRINT("State.index: %d\n", index); > + vmsg->payload.state.num = dev->vq[index].last_avail_index; > + vmsg->size = sizeof(vmsg->payload.state); > + /* FIXME: this is a work-around for a bug in QEMU enabling > + * too early vrings. When protocol features are enabled, > + * we have to respect * VHOST_USER_SET_VRING_ENABLE request. */ > + dev->ready = 0; > + > + /* Reply */ > + return 1; > } > > static int > @@ -829,7 +973,17 @@ vubr_set_vring_kick_exec(VubrDev *dev, VhostUserMsg > *vmsg) > DPRINT("Waiting for kicks on fd: %d for vq: %d\n", > dev->vq[index].kick_fd, index); > } > + /* We temporarily use this hack to determine that both TX and RX > + * queues are set up and ready for processing. > + * FIXME: we need to rely in VHOST_USER_SET_VRING_ENABLE and > + * actual kicks. */ And then ready flag will be per vq. Yes. > + if (dev->vq[0].kick_fd != -1 && > + dev->vq[1].kick_fd != -1) { > + dev->ready = 1; > + DPRINT("vhost-user-bridge is ready for processing queues.\n"); > + } > return 0; > + > } > > static int > @@ -858,9 +1012,12 @@ vubr_set_vring_err_exec(VubrDev *dev, VhostUserMsg > *vmsg) > static int > vubr_get_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - /* FIXME: unimplented */ > + vmsg->payload.u64 = 1ULL << VHOST_USER_PROTOCOL_F_LOG_SHMFD; > DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64); > - return 0; > + vmsg->size = sizeof(vmsg->payload.u64); > + > + /* Reply */ > + return 1; > } > > static int > @@ -881,7 +1038,12 @@ vubr_get_queue_num_exec(VubrDev *dev, VhostUserMsg > *vmsg) > static int > vubr_set_vring_enable_exec(VubrDev *dev, VhostUserMsg *vmsg) > { > - DPRINT("Function %s() not implemented yet.\n", __func__); > + unsigned int index = vmsg->payload.state.index; > + unsigned int enable = vmsg->payload.state.num; > + > + DPRINT("State.index: %d\n", index); > + DPRINT("State.enable: %d\n", enable); > + dev->vq[index].enable = enable; > return 0; > } > > @@ -987,7 +1149,7 @@ vubr_accept_cb(int sock, void *ctx) > socklen_t len = sizeof(un); > > conn_fd = accept(sock, (struct sockaddr *) &un, &len); > - if (conn_fd == -1) { > + if (conn_fd == -1) { > vubr_die("accept()"); > } > DPRINT("Got connection from remote peer on sock %d\n", conn_fd); > @@ -1009,9 +1171,17 @@ vubr_new(const char *path) > .size = 0, > .last_avail_index = 0, .last_used_index = 0, > .desc = 0, .avail = 0, .used = 0, > + .enable = 0, > }; > } > > + /* Init log */ > + dev->log_call_fd = -1; > + dev->log_size = 0; > + dev->log_table = 0; > + dev->ready = 0; > + dev->features = 0; > + > /* Get a UNIX socket. */ > dev->sock = socket(AF_UNIX, SOCK_STREAM, 0); > if (dev->sock == -1) { > -- > --Victor