On Mon, Mar 24, 2014 at 11:56:16AM +0000, anton.iva...@kot-begemot.co.uk wrote:
> 1. Correct buffering and corect poll FSM
> Current qemu queue logic assumes single packet inputs, not multiple packet
> RX. If the receiver cannot take more packets it will enqueue the new
> arrivals and incur a malloc, memcpy and free to do that.
> This is unnecessary if you use recvmmsg and treat the receive message array as
> a ring buffer. If the peer returns a 0 you leave the message(s) in the 
> receive 
> array and use the remainder of the already message array to continue
> reading. It is a natural queue, no need to double-buffer and memcpy on 
> top of that.
> The poll logic and the send logic has been updated to account for that.
> End result - 10%+ performance compared to using qemu_send_packet_async

Does this mean you regularly see qemu_send_packet_async() return 0
because l2tp is reading more packets using recvmmsg() than the NIC can
fit into its rx ring?  I'd like to understand this case better.

> +static void l2tpv3_writable(void *opaque)
> +{
> +    NetL2TPV3State *s = opaque;
> +
> +    l2tpv3_write_poll(s, false);
> +
> +    net_l2tpv3_send(s);

This is the wrong function because net-l2tpv3_send() *reads* new packets
from the socket.  We must tell the peer to resume transmitting packets
so we can *write* them to the socket.

This line should be:

> +static int l2tpv3_verify_header(NetL2TPV3State *s, uint8_t *buf)
> +{
> +
> +    uint32_t *session;
> +    uint64_t cookie;
> +
> +    if ((!s->udp) && (!s->ipv6)) {
> +        buf += sizeof(struct iphdr) /* fix for ipv4 raw */;
> +    }
> +
> +    /* we do not do a strict check for "data" packets as per
> +    * the RFC spec because the pure IP spec does not have
> +    * that anyway.
> +    */
> +
> +    if (s->cookie) {
> +        if (s->cookie_is_64) {
> +            cookie = ldq_be_p(buf + s->cookie_offset);
> +        } else {
> +            cookie = ldl_be_p(buf + s->cookie_offset);
> +        }
> +        if (cookie != s->rx_cookie) {
> +            if (!s->header_mismatch) {
> +                error_report("unknown cookie id\n");
> +            }

Do we need to set s->header_mismatch = true to prevent flooding logs?

> +            return -1;
> +        }
> +    }
> +    session = (uint32_t *) (buf + s->session_offset);
> +    if (ldl_be_p(session) != s->rx_session) {
> +        if (!s->header_mismatch) {
> +            error_report("session mismatch");
> +        }

Same here.

> +static void net_l2tpv3_send(void *opaque)
> +{
> +    NetL2TPV3State *s = opaque;
> +
> +    int count, target_count, offset, size = 0;
> +    struct mmsghdr *msgvec;
> +    struct iovec *vec;
> +    bool bad_read;
> +    int data_size;
> +
> +    /* go into ring mode only if there is a "pending" tail */
> +
> +    if (s->queue_depth) {
> +
> +        /* The ring buffer we use has variable intake
> +         * count of how much we can read varies - adjust accordingly
> +         */
> +
> +        target_count = MAX_L2TPV3_MSGCNT - s->queue_depth;
> +
> +        /* Ensure we do not overrun the ring when we have
> +         * a lot of enqueued packets
> +         */
> +
> +        if (s->queue_head + target_count > MAX_L2TPV3_MSGCNT) {
> +            target_count = MAX_L2TPV3_MSGCNT - s->queue_head;
> +        }
> +    } else {
> +
> +        /* we do not have any pending packets - we can use
> +        * the whole message vector linearly instead of using
> +        * it as a ring
> +        */
> +
> +        s->queue_head = 0;
> +        s->queue_tail = 0;
> +        target_count = MAX_L2TPV3_MSGCNT;
> +    }
> +
> +    msgvec = s->msgvec + s->queue_head;
> +    if (target_count > 0) {
> +        count = recvmmsg(
> +                s->fd,
> +                msgvec,
> +                target_count, MSG_DONTWAIT, NULL);
> +        s->queue_head = (s->queue_head + count) % MAX_L2TPV3_MSGCNT;
> +        s->queue_depth += count;

These calculations break if count == -1 because there was an error (e.g.

> +    }
> +    if (s->queue_depth > 0 && (!s->delivering)) {
> +        do {
> +            msgvec = s->msgvec + s->queue_tail;
> +            if (msgvec->msg_len > 0) {
> +                data_size = msgvec->msg_len - s->header_size;
> +                vec = msgvec->msg_hdr.msg_iov;
> +                if ((data_size > 0) &&
> +                    (l2tpv3_verify_header(s, vec->iov_base) == 0)) {
> +                    vec++;
> +                    /* we do not use the "normal" send and send async
> +                     * functions here because we have our own buffer -
> +                     * the message vector. If we use the "usual" ones
> +                     * we will end up double-buffering.
> +                     */
> +                    s->delivering = true;
> +                    /* deliver directly to peer bypassing queueing and
> +                     * buffering
> +                     */
> +                    size = qemu_deliver_packet(
> +                            &s->nc,
> +                            QEMU_NET_PACKET_FLAG_NONE,
> +                            vec->iov_base,
> +                            data_size,
> +                            s->nc.peer
> +                        );
> +                    s->delivering = false;
> +                    bad_read = false;
> +                } else {
> +                    bad_read = true;
> +                    if (!s->header_mismatch) {
> +                        /* report error only once */
> +                        error_report("l2tpv3 header verification failed");
> +                        s->header_mismatch = true;
> +                    }
> +                }
> +            } else {
> +                bad_read = true;
> +            }
> +            if ((bad_read) || (size > 0)) {
> +                s->queue_tail = (s->queue_tail + 1) % MAX_L2TPV3_MSGCNT;
> +                s->queue_depth--;
> +            }
> +        } while (
> +                (s->queue_depth > 0) &&
> +                qemu_can_send_packet(&s->nc) &&
> +                ((size > 0) || bad_read)
> +            );
> +    }
> +    if (s->queue_depth >= (MAX_L2TPV3_MSGCNT - 1)) {
> +        /* We change the read poll flag only if our internal queue
> +         * (the message vector) is full. We do not change it on
> +         * size == 0 as other transports because we do our own buffering
> +         */
> +        l2tpv3_read_poll(s, false);

I don't see where l2tpv3_read_poll() is set to true again.  How do we
resume reading the socket?

Reply via email to