On Mon, Mar 24, 2014 at 11:56:16AM +0000, anton.iva...@kot-begemot.co.uk wrote: > 1. Correct buffering and corect poll FSM > > Current qemu queue logic assumes single packet inputs, not multiple packet > RX. If the receiver cannot take more packets it will enqueue the new > arrivals and incur a malloc, memcpy and free to do that. > > This is unnecessary if you use recvmmsg and treat the receive message array as > a ring buffer. If the peer returns a 0 you leave the message(s) in the > receive > array and use the remainder of the already message array to continue > reading. It is a natural queue, no need to double-buffer and memcpy on > top of that. > > The poll logic and the send logic has been updated to account for that. > > End result - 10%+ performance compared to using qemu_send_packet_async
Does this mean you regularly see qemu_send_packet_async() return 0 because l2tp is reading more packets using recvmmsg() than the NIC can fit into its rx ring? I'd like to understand this case better. > +static void l2tpv3_writable(void *opaque) > +{ > + NetL2TPV3State *s = opaque; > + > + l2tpv3_write_poll(s, false); > + > + net_l2tpv3_send(s); This is the wrong function because net-l2tpv3_send() *reads* new packets from the socket. We must tell the peer to resume transmitting packets so we can *write* them to the socket. This line should be: qemu_flush_queued_packets(&s->nc); > +static int l2tpv3_verify_header(NetL2TPV3State *s, uint8_t *buf) > +{ > + > + uint32_t *session; > + uint64_t cookie; > + > + if ((!s->udp) && (!s->ipv6)) { > + buf += sizeof(struct iphdr) /* fix for ipv4 raw */; > + } > + > + /* we do not do a strict check for "data" packets as per > + * the RFC spec because the pure IP spec does not have > + * that anyway. > + */ > + > + if (s->cookie) { > + if (s->cookie_is_64) { > + cookie = ldq_be_p(buf + s->cookie_offset); > + } else { > + cookie = ldl_be_p(buf + s->cookie_offset); > + } > + if (cookie != s->rx_cookie) { > + if (!s->header_mismatch) { > + error_report("unknown cookie id\n"); > + } Do we need to set s->header_mismatch = true to prevent flooding logs? > + return -1; > + } > + } > + session = (uint32_t *) (buf + s->session_offset); > + if (ldl_be_p(session) != s->rx_session) { > + if (!s->header_mismatch) { > + error_report("session mismatch"); > + } Same here. > +static void net_l2tpv3_send(void *opaque) > +{ > + NetL2TPV3State *s = opaque; > + > + int count, target_count, offset, size = 0; > + struct mmsghdr *msgvec; > + struct iovec *vec; > + bool bad_read; > + int data_size; > + > + /* go into ring mode only if there is a "pending" tail */ > + > + if (s->queue_depth) { > + > + /* The ring buffer we use has variable intake > + * count of how much we can read varies - adjust accordingly > + */ > + > + target_count = MAX_L2TPV3_MSGCNT - s->queue_depth; > + > + /* Ensure we do not overrun the ring when we have > + * a lot of enqueued packets > + */ > + > + if (s->queue_head + target_count > MAX_L2TPV3_MSGCNT) { > + target_count = MAX_L2TPV3_MSGCNT - s->queue_head; > + } > + } else { > + > + /* we do not have any pending packets - we can use > + * the whole message vector linearly instead of using > + * it as a ring > + */ > + > + s->queue_head = 0; > + s->queue_tail = 0; > + target_count = MAX_L2TPV3_MSGCNT; > + } > + > + msgvec = s->msgvec + s->queue_head; > + if (target_count > 0) { > + count = recvmmsg( > + s->fd, > + msgvec, > + target_count, MSG_DONTWAIT, NULL); > + s->queue_head = (s->queue_head + count) % MAX_L2TPV3_MSGCNT; > + s->queue_depth += count; These calculations break if count == -1 because there was an error (e.g. EINTR). > + } > + if (s->queue_depth > 0 && (!s->delivering)) { > + do { > + msgvec = s->msgvec + s->queue_tail; > + if (msgvec->msg_len > 0) { > + data_size = msgvec->msg_len - s->header_size; > + vec = msgvec->msg_hdr.msg_iov; > + if ((data_size > 0) && > + (l2tpv3_verify_header(s, vec->iov_base) == 0)) { > + vec++; > + /* we do not use the "normal" send and send async > + * functions here because we have our own buffer - > + * the message vector. If we use the "usual" ones > + * we will end up double-buffering. > + */ > + s->delivering = true; > + /* deliver directly to peer bypassing queueing and > + * buffering > + */ > + size = qemu_deliver_packet( > + &s->nc, > + QEMU_NET_PACKET_FLAG_NONE, > + vec->iov_base, > + data_size, > + s->nc.peer > + ); > + s->delivering = false; > + bad_read = false; > + } else { > + bad_read = true; > + if (!s->header_mismatch) { > + /* report error only once */ > + error_report("l2tpv3 header verification failed"); > + s->header_mismatch = true; > + } > + } > + } else { > + bad_read = true; > + } > + if ((bad_read) || (size > 0)) { > + s->queue_tail = (s->queue_tail + 1) % MAX_L2TPV3_MSGCNT; > + s->queue_depth--; > + } > + } while ( > + (s->queue_depth > 0) && > + qemu_can_send_packet(&s->nc) && > + ((size > 0) || bad_read) > + ); > + } > + if (s->queue_depth >= (MAX_L2TPV3_MSGCNT - 1)) { > + /* We change the read poll flag only if our internal queue > + * (the message vector) is full. We do not change it on > + * size == 0 as other transports because we do our own buffering > + */ > + l2tpv3_read_poll(s, false); I don't see where l2tpv3_read_poll() is set to true again. How do we resume reading the socket?