On 2015/1/27 17:37, Michael S. Tsirkin wrote: > On Tue, Jan 27, 2015 at 03:57:13PM +0800, Linhaifeng wrote: >> Hi,all >> >> I use vhost-user to send data to VM at first it cant work well but after >> many hours VM can not receive data but can send data. >> >> (gdb)p avail_idx >> $4 = 2668 >> (gdb)p free_entries >> $5 = 0 >> (gdb)l >> /* check that we have enough buffers */ >> if (unlikely(count > free_entries)) >> count = free_entries; >> >> if (count == 0){ >> int b=0; >> if(b) { // when set b=1 to notify guest rx_ring will restart to >> work >> if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { >> >> eventfd_write(vq->callfd, 1); >> } >> } >> return 0; >> } >> >> some info i print in guest: >> >> net eth3:vi->num=199 >> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668 >> net eth3:svq info: num_free=254, used->idx=1644, avail->idx=1644 >> >> net eth3:vi->num=199 >> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668 >> net eth3:svq info: num_free=254, used->idx=1645, avail->idx=1645 >> >> net eth3:vi->num=199 >> net eth3:rvq info: num_free=57, used->idx=2668, avail->idx=2668 >> net eth3:svq info: num_free=254, used->idx=1646, avail->idx=1646 >> >> # free >> total used free shared buffers cached >> Mem: 3924100 337252 3586848 0 95984 138060 >> -/+ buffers/cache: 103208 3820892 >> Swap: 970748 0 970748 >> >> I have two questions: >> 1.Should we need to notify guest when there is no buffer in vq->avail? > > No unless NOTIFY_ON_EMPTY is set (most guests don't set it).
Thank you for your new knowledge:) > >> 2.Why virtio_net stop to fill avail? > > Most likely, it didn't get an interrupt. > > If so, it would be a dpdk vhost user bug. > Which code are you using in dpdk? > Hi,mst Thank you for your reply. Sorry, maybe my mail filter have a bug,so i saw this mail until now. I use the dpdk code before 2bbb811.I paste the code here for you to review. (Note that the vhost_enqueue_burstand vhost_dequeue_burst function runs as poll mode.) I guess if vhost_enqueue_burst used all the buffers in rx_ring then try to notify guest to receive but at this time vcpu may be exiting so guest cann't receive the notify. /* * Enqueues packets to the guest virtio RX virtqueue for vhost devices. */ static inline uint32_t __attribute__((always_inline)) vhost_enqueue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned count) { struct vhost_virtqueue *vq; struct vring_desc *desc; struct rte_mbuf *buff; /* The virtio_hdr is initialised to 0. */ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0,0,0,0,0,0},0}; uint64_t buff_addr = 0; uint64_t buff_hdr_addr = 0; uint32_t head[PKT_BURST_SIZE], packet_len = 0; uint32_t head_idx, packet_success = 0; uint32_t mergeable, mrg_count = 0; uint32_t retry = 0; uint16_t avail_idx, res_cur_idx; uint16_t res_base_idx, res_end_idx; uint16_t free_entries; uint8_t success = 0; LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh); vq = dev->virtqueue[VIRTIO_RXQ]; count = (count > PKT_BURST_SIZE) ? PKT_BURST_SIZE : count; /* As many data cores may want access to available buffers, they need to be reserved. */ do { res_base_idx = vq->last_used_idx_res; avail_idx = *((volatile uint16_t *)&vq->avail->idx); free_entries = (avail_idx - res_base_idx); /* If retry is enabled and the queue is full then we wait and retry to avoid packet loss. */ if (unlikely(count > free_entries)) { for (retry = 0; retry < burst_tx_retry_num; retry++) { rte_delay_us(burst_tx_delay_time); avail_idx = *((volatile uint16_t *)&vq->avail->idx); free_entries = (avail_idx - res_base_idx); if (count <= free_entries) break; } } /*check that we have enough buffers*/ if (unlikely(count > free_entries)) count = free_entries; if (count == 0) // !!!!!!!!!!!!!!!!!!! when VM cann't receive always return here return 0; res_end_idx = res_base_idx + count; /* vq->last_used_idx_res is atomically updated. */ success = rte_atomic16_cmpset(&vq->last_used_idx_res, res_base_idx, res_end_idx); } while (unlikely(success == 0)); res_cur_idx = res_base_idx; LOG_DEBUG(APP, "(%"PRIu64") Current Index %d| End Index %d\n", dev->device_fh, res_cur_idx, res_end_idx); /* Prefetch available ring to retrieve indexes. */ rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]); /* Check if the VIRTIO_NET_F_MRG_RXBUF feature is enabled. */ mergeable = dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF); /* Retrieve all of the head indexes first to avoid caching issues. */ for (head_idx = 0; head_idx < count; head_idx++) head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) & (vq->size - 1)]; /*Prefetch descriptor index. */ rte_prefetch0(&vq->desc[head[packet_success]]); while (res_cur_idx != res_end_idx) { /* Get descriptor from available ring */ desc = &vq->desc[head[packet_success]]; buff = pkts[packet_success]; /* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */ buff_addr = gpa_to_vva(dev, desc->addr); /* Prefetch buffer address. */ rte_prefetch0((void*)(uintptr_t)buff_addr); if (mergeable && (mrg_count != 0)) { desc->len = packet_len = rte_pktmbuf_data_len(buff); } else { /* Copy virtio_hdr to packet and increment buffer address */ buff_hdr_addr = buff_addr; packet_len = rte_pktmbuf_data_len(buff) + vq->vhost_hlen; /* * If the descriptors are chained the header and data are placed in * separate buffers. */ if (desc->flags & VRING_DESC_F_NEXT) { desc->len = vq->vhost_hlen; desc = &vq->desc[desc->next]; /* Buffer address translation. */ buff_addr = gpa_to_vva(dev, desc->addr); desc->len = rte_pktmbuf_data_len(buff); } else { buff_addr += vq->vhost_hlen; desc->len = packet_len; } } /* Update used ring with desc information */ vq->used->ring[res_cur_idx & (vq->size - 1)].id = head[packet_success]; vq->used->ring[res_cur_idx & (vq->size - 1)].len = packet_len; /* Copy mbuf data to buffer */ rte_memcpy((void *)(uintptr_t)buff_addr, (const void*)buff->pkt.data, rte_pktmbuf_data_len(buff)); PRINT_PACKET(dev, (uintptr_t)buff_addr, rte_pktmbuf_data_len(buff), 0); res_cur_idx++; packet_success++; /* If mergeable is disabled then a header is required per buffer. */ if (!mergeable) { rte_memcpy((void *)(uintptr_t)buff_hdr_addr, (const void*)&virtio_hdr, vq->vhost_hlen); PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1); } else { mrg_count++; /* Merge buffer can only handle so many buffers at a time. Tell the guest if this limit is reached. */ if ((mrg_count == MAX_MRG_PKT_BURST) || (res_cur_idx == res_end_idx)) { virtio_hdr.num_buffers = mrg_count; LOG_DEBUG(APP, "(%"PRIu64") RX: Num merge buffers %d\n", dev->device_fh, virtio_hdr.num_buffers); rte_memcpy((void *)(uintptr_t)buff_hdr_addr, (const void*)&virtio_hdr, vq->vhost_hlen); PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1); mrg_count = 0; } } if (res_cur_idx < res_end_idx) { /* Prefetch descriptor index. */ rte_prefetch0(&vq->desc[head[packet_success]]); } } rte_compiler_barrier(); /* Wait until it's our turn to add our buffer to the used ring. */ while (unlikely(vq->last_used_idx != res_base_idx)) rte_pause(); *(volatile uint16_t *) &vq->used->idx += count; vq->last_used_idx = res_end_idx; /* Kick the guest if necessary. */ if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) eventfd_write(vq->kickfd,1); return count; } /* * Dequeues packets from the guest virtio TX virtqueue for vhost devices. */ static inline uint16_t __attribute__((always_inline)) vhost_dequeue_burst(struct virtio_net *dev, struct rte_mbuf **pkts, unsigned count) { struct rte_mbuf *mbuf; struct vhost_virtqueue *vq; struct vring_desc *desc; uint64_t buff_addr = 0; uint32_t head[PKT_BURST_SIZE]; uint32_t used_idx, i; uint16_t free_entries, packet_success = 0; uint16_t avail_idx; vq = dev->virtqueue[VIRTIO_TXQ]; avail_idx = *((volatile uint16_t *)&vq->avail->idx); /* If there are no available buffers then return. */ if (vq->last_used_idx == avail_idx) return 0; LOG_DEBUG(APP, "(%"PRIu64") virtio_dev_tx()\n", dev->device_fh); /* Prefetch available ring to retrieve head indexes. */ rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]); /*get the number of free entries in the ring*/ free_entries = (avail_idx - vq->last_used_idx); /* Limit to PKT_BURST_SIZE. */ if (free_entries > count) free_entries = count; /* * Performance is better if cachelines containing descriptors are not accessed by multiple * cores. We try finish with a cacheline before passing it on. */ if (likely(free_entries > DESC_PER_CACHELINE)) free_entries = free_entries - ((vq->last_used_idx + free_entries) % DESC_PER_CACHELINE); LOG_DEBUG(APP, "(%"PRIu64") Buffers available %d\n", dev->device_fh, free_entries); /* Retrieve all of the head indexes first to avoid caching issues. */ for (i = 0; i < free_entries; i++) head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)]; /* Prefetch descriptor index. */ rte_prefetch0(&vq->desc[head[packet_success]]); rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]); while (packet_success < free_entries) { desc = &vq->desc[head[packet_success]]; /* Discard first buffer as it is the virtio header */ desc = &vq->desc[desc->next]; /* Buffer address translation. */ buff_addr = gpa_to_vva(dev, desc->addr); /* Prefetch buffer address. */ rte_prefetch0((void*)(uintptr_t)buff_addr); used_idx = vq->last_used_idx & (vq->size - 1); if (packet_success < (free_entries - 1)) { /* Prefetch descriptor index. */ rte_prefetch0(&vq->desc[head[packet_success+1]]); rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]); } /* Update used index buffer information. */ vq->used->ring[used_idx].id = head[packet_success]; vq->used->ring[used_idx].len = 0; /* Allocate an mbuf and populate the structure. */ mbuf = rte_pktmbuf_alloc(pktmbuf_pool); if (unlikely(mbuf == NULL)) { RTE_LOG(ERR, APP, "Failed to allocate memory for mbuf.\n"); return packet_success; } /* Setup dummy mbuf. */ mbuf->pkt.data_len = desc->len; mbuf->pkt.pkt_len = mbuf->pkt.data_len; rte_memcpy((void*) mbuf->pkt.data, (const void*) buff_addr, mbuf->pkt.data_len); pkts[packet_success]=mbuf; PRINT_PACKET(dev, (uintptr_t)buff_addr, desc->len, 0); vq->last_used_idx++; packet_success++; } rte_compiler_barrier(); vq->used->idx += packet_success; /* Kick guest if required. */ if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) eventfd_write(vq->kickfd,1); return packet_success; } >> >> >> >> >> >> -- >> Regards, >> Haifeng > > . > -- Regards, Haifeng