Hi there,

This diff adds udp support with PPS (packet per second) accounting and changes
tcpbench to use libevent instead of poll(2). 

The sender/client is pretty stupid, it sends as much as possible until a ENOBUFS
is reached, then it sleeps for 50ms, I'm not sure this is acceptable, we don't
have sched_yield(2) which would do the work so I chose this random value, please
correct me.

The sender(client) accounts Tx pps while receiver(server) accounts Rx pps.

There is no udp control block so -k should only be used with tcp, taking a
further look in code, we could have some per-udp stats for each udp connection,
not much to record though.

The packet size can be specified with the -B flag, since we write the whole
application buffer (-B) it ends up being the packet size (will mention it on
manpage), it defaults to 1472 in udp client mode to avoid fragmentation (maybe
we could fetch iface MTU someday ?) , udp server mode uses the same tcp buffer
size:
#define DEFAULT_BUF (256 * 1024)

I chose this behaviour so the client can play with different packet sizes
without bothering with the server all the time.

UDP mode is selected with the flag -u, must be used both on server and the
client. There is no per-fd or per-connection struct stactx as in tcp mode, I use
a single structure to account everything.

Follows a sample, tested on sparc64 and i386:

# Server, note that Rx PPS will be less than Tx PPS in client, since we're
# dropping
gandalf:tcpbench: ./tcpbench -s -u
Elapsed:        3194 Mbps:      14.526 Peak Mbps:      14.526 Rx PPS:   1233.000
Elapsed:        4194 Mbps:      45.324 Peak Mbps:      45.324 Rx PPS:   3848.000
Elapsed:        5194 Mbps:      46.268 Peak Mbps:      46.268 Rx PPS:   3929.000
Elapsed:        6194 Mbps:      45.550 Peak Mbps:      46.268 Rx PPS:   3868.000
Elapsed:        7194 Mbps:      42.052 Peak Mbps:      46.268 Rx PPS:   3571.000
Elapsed:        8194 Mbps:      46.774 Peak Mbps:      46.774 Rx PPS:   3972.000

# Client
elendil:tcpbench: ./tcpbench -u gandalf
Elapsed:        1044 Mbps:     101.292 Peak Mbps:     101.292 Tx PPS:   8601.000
Elapsed:        2054 Mbps:      96.390 Peak Mbps:     101.292 Tx PPS:   8185.000
Elapsed:        3094 Mbps:      95.238 Peak Mbps:     101.292 Tx PPS:   8087.000
Elapsed:        4144 Mbps:      95.408 Peak Mbps:     101.292 Tx PPS:   8101.000
Elapsed:        5184 Mbps:      96.373 Peak Mbps:     101.292 Tx PPS:   8183.000

As long as we can guarantee that Tx PPS > Rx PPS we know we're stressing enough,
that 50ms sleep is pretty lame though.

Please have in mind there are still some stuff to be fixed, but I'd love to hear
what you think, the things I wasn't sure are marked with XXX.

obs: I've added myself on the author header since I've been playing with
tcpbench for some time.

Index: Makefile
===================================================================
RCS file: /cvs/src/usr.bin/tcpbench/Makefile,v
retrieving revision 1.3
diff -d -u -p -w -r1.3 Makefile
--- Makefile    26 Jun 2008 07:05:56 -0000      1.3
+++ Makefile    27 Jan 2011 00:06:41 -0000
@@ -15,7 +15,7 @@ CDIAGFLAGS+=    -Wshadow
 
 PROG=tcpbench
 
-LDADD=-lkvm
+LDADD=-lkvm -levent
 
 #BINGRP= kmem
 #BINMODE=2555
Index: tcpbench.c
===================================================================
RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v
retrieving revision 1.19
diff -d -u -p -w -r1.19 tcpbench.c
--- tcpbench.c  19 Oct 2010 10:03:23 -0000      1.19
+++ tcpbench.c  27 Jan 2011 00:06:41 -0000
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2008 Damien Miller <d...@mindrot.org>
+ * Copyright (c) 2010 Christiano F. Haesbaert <haesba...@haesbaert.org>
  *
  * Permission to use, copy, modify, and distribute this software for any
  * purpose with or without fee is hereby granted, provided that the above
@@ -19,6 +20,7 @@
 #include <sys/socket.h>
 #include <sys/socketvar.h>
 #include <sys/resource.h>
+#include <sys/queue.h>
 
 #include <net/route.h>
 
@@ -39,6 +41,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#include <event.h>
 #include <netdb.h>
 #include <signal.h>
 #include <err.h>
@@ -50,29 +53,64 @@
 
 #define DEFAULT_PORT "12345"
 #define DEFAULT_STATS_INTERVAL 1000 /* ms */
-#define DEFAULT_BUF 256 * 1024
+#define DEFAULT_BUF (256 * 1024)
+#define DEFAULT_UDP_PKT (1500 - 28) /* TODO don't hardcode this */
+#define TCP_MODE !uflag
+#define UDP_MODE uflag
 #define MAX_FD 1024
 
-sig_atomic_t done = 0;
-sig_atomic_t proc_slice = 0;
-
-static u_int  rtableid;
-static char **kflag;
-static size_t Bflag;
-static int    Sflag;
-static int    rflag;
-static int    sflag;
-static int    vflag;
+static u_int     Vflag;        /* rtableid */
+static int       Sflag;        /* Socket buffer size (tcp mode) */
+static u_int     rflag;        /* Report rate (ms) */
+static int       sflag;        /* True if server */
+static int       vflag;        /* Verbose */
+static int       uflag;        /* UDP mode */
+static kvm_t    *kvmh;         /* Kvm handler */
+static char    **kvars;        /* Kvm enabled vars */
+static u_long    ktcbtab;      /* Ktcb */
+static char     *dummybuf;     /* IO buffer */
+static size_t    dummybuf_len; /* IO buffer len */
 
-/* stats for a single connection */
+/* stats for a single tcp connection, udp uses only one  */
 struct statctx {
+       TAILQ_ENTRY(statctx) entry;
        struct timeval t_start, t_last;
        unsigned long long bytes;
-       u_long tcbaddr;
-       char **kvars;
-       kvm_t *kh;
+       int fd;
+       char *buf;
+       size_t buflen;
+       struct event ev;
+       /* TCP only */
+       u_long tcp_tcbaddr;
+       /* UDP only */
+       u_long udp_slice_pkts;
 };
 
+static void    signal_handler(int, short, void *);
+static void    saddr_ntop(const struct sockaddr *, socklen_t, char *, size_t);
+static void    drop_gid(void);
+static void    set_slice_timer(int);
+static void    print_tcp_header(void);
+static void    kget(u_long, void *, size_t);
+static u_long  kfind_tcb(int);
+static void    kupdate_stats(u_long, struct inpcb *, struct tcpcb *,
+    struct socket *);
+static void    list_kvars(void);
+static void    check_kvar(const char *);
+static char ** check_prepare_kvars(char *);
+static void    stats_prepare(struct statctx *);
+static void    tcp_stats_display(unsigned long long, long double, float,
+    struct statctx *, struct inpcb *, struct tcpcb *, struct socket *);
+static void    tcp_process_slice(int, short, void *);
+static void    tcp_server_handle_sc(int, short, void *);
+static void    tcp_server_accept(int, short, void *);
+static nfds_t  server_init(struct addrinfo *, struct statctx *);
+static void    client_handle_sc(int, short, void *);
+static void    client_init(struct addrinfo *, int, struct statctx *);
+static int     clock_gettime_tv(clockid_t, struct timeval *);
+static void    udp_server_handle_sc(int, short, void *);
+static void    udp_process_slice(int, short, void *);
+
 /*
  * We account the mainstats here, that is the stats
  * for all connections, all variables starting with slice
@@ -82,12 +120,12 @@ struct statctx {
  */
 static struct {
        unsigned long long slice_bytes; /* bytes for last slice */
-       struct timeval t_start;         /* when we started counting */
        long double peak_mbps;          /* peak mbps so far */
        int nconns;                     /* connected clients */
+       struct event timer;             /* process timer */
 } mainstats;
 
-/* When adding variables, also add to stats_display() */
+/* When adding variables, also add to tcp_stats_display() */
 static const char *allowed_kvars[] = {
        "inpcb.inp_flags",
        "sockb.so_rcv.sb_cc",
@@ -124,18 +162,7 @@ static const char *allowed_kvars[] = {
        NULL
 };
 
-static void
-exitsighand(int signo)
-{
-       done = signo;
-}
-
-static void
-alarmhandler(int signo)
-{
-       proc_slice = 1;
-       signal(signo, alarmhandler);
-}
+TAILQ_HEAD(, statctx) sc_queue;
 
 static void __dead
 usage(void)
@@ -143,13 +170,32 @@ usage(void)
        fprintf(stderr,
            "usage: tcpbench -l\n"
            "       tcpbench [-v] [-B buf] [-k kvars] [-n connections] [-p 
port]\n"
-           "                [-r rate] [-S space] [-V rtable] hostname\n"
+           "                [-q] [-r rate] [-S space] [-V rtable] hostname\n"
            "       tcpbench -s [-v] [-B buf] [-k kvars] [-p port]\n"
-           "                [-r rate] [-S space] [-V rtable]\n");
+           "                [-q] [-r rate] [-S space] [-V rtable]\n");
        exit(1);
 }
 
 static void
+signal_handler(int sig, short event, void *bula)
+{
+       /*
+        * signal handler rules don't apply, libevent decouples for us
+        */
+       switch (sig) {
+       case SIGINT:
+       case SIGTERM:
+       case SIGHUP:
+               warnx("Terminated by signal %d", sig);
+               exit(0);
+               break;          /* NOTREACHED */
+       default:
+               errx(1, "unexpected signal %d", sig);
+               break;          /* NOTREACHED */
+       }
+}
+
+static void
 saddr_ntop(const struct sockaddr *addr, socklen_t alen, char *buf, size_t len)
 {
        char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV];
@@ -166,47 +212,72 @@ saddr_ntop(const struct sockaddr *addr, 
 }
 
 static void
-set_timer(int toggle)
+drop_gid(void)
 {
-       struct itimerval itv;
+       gid_t gid;
 
-       if (rflag <= 0)
+       gid = getgid();
+       if (setresgid(gid, gid, gid) == -1)
+               err(1, "setresgid");
+}
+
+static void
+set_slice_timer(int on)
+{
+       struct timeval tv;
+
+       if (rflag == 0)
                return;
 
-       if (toggle) {
-               itv.it_interval.tv_sec = rflag / 1000;
-               itv.it_interval.tv_usec = (rflag % 1000) * 1000;
-               itv.it_value = itv.it_interval;
+       if (on) {
+               if (evtimer_pending(&mainstats.timer, NULL))
+                       return;
+               timerclear(&tv);
+               /* XXX Is there a better way to do this ? */
+               tv.tv_sec = rflag / 1000;
+               tv.tv_usec = (rflag % 1000) * 1000;
+               
+               evtimer_add(&mainstats.timer, &tv);
+       } else {
+               if (evtimer_pending(&mainstats.timer, NULL))
+                       evtimer_del(&mainstats.timer);
+       }
        }
-       else
-               bzero(&itv, sizeof(itv));
                
-       setitimer(ITIMER_REAL, &itv, NULL);
+static int
+clock_gettime_tv(clockid_t clock_id, struct timeval *tv)
+{
+       struct timespec ts;
+
+       if (clock_gettime(clock_id, &ts) == -1)
+               return (-1);
+       
+       TIMESPEC_TO_TIMEVAL(tv, &ts);
+       
+       return (0);
 }
 
 static void
-print_header(void)
+print_tcp_header(void)
 {
        char **kv;
-       
        printf("%12s %14s %12s %8s ", "elapsed_ms", "bytes", "mbps",
            "bwidth");
-       
-       for (kv = kflag;  kflag != NULL && *kv != NULL; kv++) 
-               printf("%s%s", kv != kflag ? "," : "", *kv);
+       for (kv = kvars;  kvars != NULL && *kv != NULL; kv++) 
+               printf("%s%s", kv != kvars ? "," : "", *kv);
        
        printf("\n");
 }
 
 static void
-kget(kvm_t *kh, u_long addr, void *buf, size_t size)
+kget(u_long addr, void *buf, size_t size)
 {
-       if (kvm_read(kh, addr, buf, size) != (ssize_t)size)
-               errx(1, "kvm_read: %s", kvm_geterr(kh));
+       if (kvm_read(kvmh, addr, buf, size) != (ssize_t)size)
+               errx(1, "kvm_read: %s", kvm_geterr(kvmh));
 }
 
 static u_long
-kfind_tcb(kvm_t *kh, u_long ktcbtab, int sock)
+kfind_tcb(int sock)
 {
        struct inpcbtable tcbtab;
        struct inpcb *head, *next, *prev;
@@ -240,7 +311,7 @@ kfind_tcb(kvm_t *kh, u_long ktcbtab, int
        if (vflag >= 2)
                fprintf(stderr, "Using PCB table at %lu\n", ktcbtab);
 retry:
-       kget(kh, ktcbtab, &tcbtab, sizeof(tcbtab));
+       kget(ktcbtab, &tcbtab, sizeof(tcbtab));
        prev = head = (struct inpcb *)&CIRCLEQ_FIRST(
            &((struct inpcbtable *)ktcbtab)->inpt_queue);
        next = CIRCLEQ_FIRST(&tcbtab.inpt_queue);
@@ -250,7 +321,7 @@ retry:
        while (next != head) {
                if (vflag >= 2)
                        fprintf(stderr, "Checking PCB %p\n", next);
-               kget(kh, (u_long)next, &inpcb, sizeof(inpcb));
+               kget((u_long)next, &inpcb, sizeof(inpcb));
                if (CIRCLEQ_PREV(&inpcb, inp_queue) != prev) {
                        if (nretry--) {
                                warnx("pcb prev pointer insane");
@@ -313,7 +384,7 @@ retry:
                            in6->sin6_port != inpcb.inp_fport)
                                continue;
                }
-               kget(kh, (u_long)inpcb.inp_ppcb, &tcpcb, sizeof(tcpcb));
+               kget((u_long)inpcb.inp_ppcb, &tcpcb, sizeof(tcpcb));
                if (tcpcb.t_state != TCPS_ESTABLISHED) {
                        if (vflag >= 2)
                                fprintf(stderr, "Not established\n");
@@ -321,19 +392,19 @@ retry:
                }
                if (vflag >= 2)
                        fprintf(stderr, "Found PCB at %p\n", prev);
-               return (u_long)prev;
+               return ((u_long)prev);
        }
 
        errx(1, "No matching PCB found");
 }
 
 static void
-kupdate_stats(kvm_t *kh, u_long tcbaddr,
-    struct inpcb *inpcb, struct tcpcb *tcpcb, struct socket *sockb)
+kupdate_stats(u_long tcbaddr, struct inpcb *inpcb,
+    struct tcpcb *tcpcb, struct socket *sockb)
 {
-       kget(kh, tcbaddr, inpcb, sizeof(*inpcb));
-       kget(kh, (u_long)inpcb->inp_ppcb, tcpcb, sizeof(*tcpcb));
-       kget(kh, (u_long)inpcb->inp_socket, sockb, sizeof(*sockb));
+       kget(tcbaddr, inpcb, sizeof(*inpcb));
+       kget((u_long)inpcb->inp_ppcb, tcpcb, sizeof(*tcpcb));
+       kget((u_long)inpcb->inp_socket, sockb, sizeof(*sockb));
 }
 
 static void
@@ -371,39 +442,24 @@ check_prepare_kvars(char *list)
                        errx(1, "strdup");
                ret[n] = NULL;
        }
-       return ret;
+       return (ret);
 }
 
 static void
-stats_prepare(struct statctx *sc, int fd, kvm_t *kh, u_long ktcbtab)
+stats_prepare(struct statctx *sc)
 {
-       if (rflag <= 0)
-               return;
-       sc->kh = kh;
-       sc->kvars = kflag;
-       if (kflag)
-               sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd);
-       if (gettimeofday(&sc->t_start, NULL) == -1)
-               err(1, "gettimeofday");
+       sc->buf = dummybuf;
+       sc->buflen = dummybuf_len;
+       if (kvars)
+               sc->tcp_tcbaddr = kfind_tcb(sc->fd);
+       if (clock_gettime_tv(CLOCK_MONOTONIC, &sc->t_start) == -1)
+               err(1, "clock_gettime_tv");
        sc->t_last = sc->t_start;
-       sc->bytes = 0;
-}
 
-static void
-stats_update(struct statctx *sc, ssize_t n)
-{
-       sc->bytes += n;
-       mainstats.slice_bytes += n;
-}
-
-static void
-stats_cleanslice(void)
-{
-       mainstats.slice_bytes = 0;
 }
 
 static void
-stats_display(unsigned long long total_elapsed, long double mbps,
+tcp_stats_display(unsigned long long total_elapsed, long double mbps,
     float bwperc, struct statctx *sc, struct inpcb *inpcb,
     struct tcpcb *tcpcb, struct socket *sockb)
 {
@@ -412,14 +468,14 @@ stats_display(unsigned long long total_e
        printf("%12llu %14llu %12.3Lf %7.2f%% ", total_elapsed, sc->bytes,
            mbps, bwperc);
        
-       if (sc->kvars != NULL) {
-               kupdate_stats(sc->kh, sc->tcbaddr, inpcb, tcpcb,
+       if (kvars != NULL) {
+               kupdate_stats(sc->tcp_tcbaddr, inpcb, tcpcb,
                    sockb);
 
-               for (j = 0; sc->kvars[j] != NULL; j++) {
+               for (j = 0; kvars[j] != NULL; j++) {
 #define S(a) #a
 #define P(b, v, f)                                                     \
-                       if (strcmp(sc->kvars[j], S(b.v)) == 0) {        \
+                       if (strcmp(kvars[j], S(b.v)) == 0) {    \
                                printf("%s"f, j > 0 ? "," : "", b->v);  \
                                continue;                               \
                        }
@@ -463,30 +519,24 @@ stats_display(unsigned long long total_e
 }
 
 static void
-mainstats_display(long double slice_mbps, long double avg_mbps)
-{
-       printf("Conn: %3d Mbps: %12.3Lf Peak Mbps: %12.3Lf Avg Mbps: %12.3Lf\n",
-           mainstats.nconns, slice_mbps, mainstats.peak_mbps, avg_mbps); 
-}
-
-static void
-process_slice(struct statctx *sc, size_t nsc)
+tcp_process_slice(int fd, short event, void *bula)
 {
        unsigned long long total_elapsed, since_last;
        long double mbps, slice_mbps = 0;
        float bwperc;
-       nfds_t i;
+       struct statctx *sc;
        struct timeval t_cur, t_diff;
        struct inpcb inpcb;
        struct tcpcb tcpcb;
        struct socket sockb;
        
-       for (i = 0; i < nsc; i++, sc++) {
-               if (gettimeofday(&t_cur, NULL) == -1)
-                       err(1, "gettimeofday");
-               if (sc->kvars != NULL) /* process kernel stats */
-                       kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb,
+       TAILQ_FOREACH(sc, &sc_queue, entry) {
+               if (clock_gettime_tv(CLOCK_MONOTONIC, &t_cur) == -1)
+                       err(1, "clock_gettime_tv");
+               if (kvars != NULL) /* process kernel stats */
+                       kupdate_stats(sc->tcp_tcbaddr, &inpcb, &tcpcb,
                            &sockb);
+               
                timersub(&t_cur, &sc->t_start, &t_diff);
                total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
                timersub(&t_cur, &sc->t_last, &t_diff);
@@ -495,66 +545,167 @@ process_slice(struct statctx *sc, size_t
                mbps = (sc->bytes * 8) / (since_last * 1000.0);
                slice_mbps += mbps;
                
-               stats_display(total_elapsed, mbps, bwperc, sc,
+               tcp_stats_display(total_elapsed, mbps, bwperc, sc,
                    &inpcb, &tcpcb, &sockb);
                
                sc->t_last = t_cur;
                sc->bytes = 0;
-
        }
 
        /* process stats for this slice */
        if (slice_mbps > mainstats.peak_mbps)
                mainstats.peak_mbps = slice_mbps;
-       mainstats_display(slice_mbps, slice_mbps / mainstats.nconns);
+       printf("Conn: %3d Mbps: %12.3Lf Peak Mbps: %12.3Lf Avg Mbps: %12.3Lf\n",
+           mainstats.nconns, slice_mbps, mainstats.peak_mbps,
+           slice_mbps / mainstats.nconns); 
+       mainstats.slice_bytes = 0;
+
+       set_slice_timer(mainstats.nconns > 0);
 }
 
-static int
-handle_connection(struct statctx *sc, int fd, char *buf, size_t buflen)
+static void
+udp_process_slice(int fd, short event, void *v_sc)
+{
+       struct statctx *sc = v_sc;
+       unsigned long long total_elapsed, since_last;
+       long double slice_mbps, pps;
+       struct timeval t_cur, t_diff;
+
+       if (clock_gettime_tv(CLOCK_MONOTONIC, &t_cur) == -1)
+                       err(1, "clock_gettime_tv");
+       /* Calculate pps */
+       timersub(&t_cur, &sc->t_start, &t_diff);
+       total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
+       timersub(&t_cur, &sc->t_last, &t_diff);
+       since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
+       slice_mbps = (sc->bytes * 8) / (since_last * 1000.0);
+       pps = (sc->udp_slice_pkts * 1000) / since_last;
+       if (slice_mbps > mainstats.peak_mbps)
+               mainstats.peak_mbps = slice_mbps;
+       printf("Elapsed: %11llu Mbps: %11.3Lf Peak Mbps: %11.3Lf %s PPS: 
%10.3Lf\n",
+           total_elapsed, slice_mbps, mainstats.peak_mbps, sflag ? "Rx" : "Tx",
+           pps);
+
+       /* Clean up this slice time */
+       sc->t_last = t_cur;
+       sc->bytes = 0;
+       sc->udp_slice_pkts = 0;
+       set_slice_timer(1);
+}
+
+static void
+udp_server_handle_sc(int fd, short event, void *v_sc)
 {
        ssize_t n;
+       struct statctx *sc = v_sc;
 
 again:
-       n = read(fd, buf, buflen);
-       if (n == -1) {
+       n = read(fd, dummybuf, dummybuf_len);
+       if (n == 0)
+               return;
+       else if (n == -1) {
                if (errno == EINTR)
                        goto again;
                else if (errno == EWOULDBLOCK) 
-                       return 0;
+                       return;
                warn("fd %d read error", fd);
+               return;
+       }
                
-               return -1;
+       if (vflag >= 3)
+               fprintf(stderr, "read: %zd bytes\n", n);
+       /* If this was our first packet, start slice timer */
+       if (mainstats.peak_mbps == 0)
+               set_slice_timer(1);
+       /* Account packet */
+       sc->udp_slice_pkts++;
+       sc->bytes += n;
        }
-       else if (n == 0) {
+
+static void
+tcp_server_handle_sc(int fd, short event, void *v_sc)
+{
+       struct statctx *sc = v_sc;
+       ssize_t n;
+
+again:
+       n = read(sc->fd, sc->buf, sc->buflen);
+       if (n == -1) {
+               if (errno == EINTR)
+                       goto again;
+               else if (errno == EWOULDBLOCK) 
+                       return;
+               warn("fd %d read error", sc->fd);
+               return;
+       } else if (n == 0) {
                if (vflag)
-                       fprintf(stderr, "%8d closed by remote end\n", fd);
-               close(fd);
-               return -1;
+                       fprintf(stderr, "%8d closed by remote end\n", sc->fd);
+               close(sc->fd);
+               TAILQ_REMOVE(&sc_queue, sc, entry);
+               free(sc);
+               mainstats.nconns--;
+               set_slice_timer(mainstats.nconns > 0);
+               return;
        }
        if (vflag >= 3)
                fprintf(stderr, "read: %zd bytes\n", n);
+       sc->bytes += n;
+       mainstats.slice_bytes += n;
+}
        
-       stats_update(sc, n);
-       return 0;
+static void
+tcp_server_accept(int fd, short event, void *bula)
+{
+       int sock, r;
+       struct statctx *sc;
+       struct sockaddr_storage ss;
+       socklen_t sslen;
+       char tmp[128];
+       
+       sslen = sizeof(ss);
+again: 
+       if ((sock = accept(fd, (struct sockaddr *)&ss,
+           &sslen)) == -1) {
+               if (errno == EINTR)
+                       goto again;
+               warn("accept");
+               return;
+       }
+       saddr_ntop((struct sockaddr *)&ss, sslen,
+           tmp, sizeof(tmp));
+       if ((r = fcntl(sock, F_GETFL, 0)) == -1)
+               err(1, "fcntl(F_GETFL)");
+       r |= O_NONBLOCK;
+       if (fcntl(sock, F_SETFL, r) == -1)
+               err(1, "fcntl(F_SETFL, O_NONBLOCK)");
+       /* Alloc client structure and register reading callback */
+       if ((sc = calloc(1, sizeof(*sc))) == NULL)
+               err(1, "calloc");
+       sc->fd = sock;
+       stats_prepare(sc);
+       event_set(&sc->ev, sc->fd, EV_READ | EV_PERSIST,
+           tcp_server_handle_sc, sc);
+       event_add(&sc->ev, NULL);
+       TAILQ_INSERT_TAIL(&sc_queue, sc, entry);
+       mainstats.nconns++;
+       set_slice_timer(mainstats.nconns > 0);
+       if (vflag)
+               warnx("Accepted connection from %s, fd = %d\n", tmp, sc->fd);
 }
 
 static nfds_t
-serverbind(struct pollfd *pfd, nfds_t max_nfds, struct addrinfo *aitop)
+server_init(struct addrinfo *aitop, struct statctx *udp_sc)
 {
        char tmp[128];
        int sock, on = 1;
        struct addrinfo *ai;
+       struct event *ev;
        nfds_t lnfds;
 
        lnfds = 0;
        for (ai = aitop; ai != NULL; ai = ai->ai_next) {
-               if (lnfds == max_nfds) {
-                       fprintf(stderr,
-                           "maximum number of listening fds reached\n");
-                       break;
-               }
                saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp));
-               if (vflag)
+               if (vflag && TCP_MODE)
                        fprintf(stderr, "Try to listen on %s\n", tmp);
                if ((sock = socket(ai->ai_family, ai->ai_socktype,
                    ai->ai_protocol)) == -1) {
@@ -564,11 +715,11 @@ serverbind(struct pollfd *pfd, nfds_t ma
                                warn("socket");
                        continue;
                }
-               if (rtableid && ai->ai_family == AF_INET) {
+               if (Vflag && ai->ai_family == AF_INET) {
                        if (setsockopt(sock, IPPROTO_IP, SO_RTABLE,
-                           &rtableid, sizeof(rtableid)) == -1)
+                           &Vflag, sizeof(Vflag)) == -1)
                                err(1, "setsockopt SO_RTABLE");
-               } else if (rtableid)
+               } else if (Vflag)
                        warnx("rtable only supported on AF_INET");
                if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
                    &on, sizeof(on)) == -1)
@@ -584,8 +735,9 @@ serverbind(struct pollfd *pfd, nfds_t ma
                if (Sflag) {
                        if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
                            &Sflag, sizeof(Sflag)) == -1)
-                               warn("set TCP receive buffer size");
+                               warn("set receive buffer size");
                }
+               if (TCP_MODE)
                if (listen(sock, 64) == -1) {
                        if (ai->ai_next == NULL)
                                err(1, "listen");
@@ -594,167 +746,73 @@ serverbind(struct pollfd *pfd, nfds_t ma
                        close(sock);
                        continue;
                }
+               if ((ev = calloc(1, sizeof(*ev))) == NULL)
+                       err(1, "calloc");
+               if (UDP_MODE)
+                       event_set(ev, sock, EV_READ | EV_PERSIST,
+                           udp_server_handle_sc, udp_sc);
+               else
+                       event_set(ev, sock, EV_READ | EV_PERSIST,
+                           tcp_server_accept, NULL);
+               event_add(ev, NULL);
                if (vflag >= 3)
-                       fprintf(stderr, "listening on fd %d\n", sock);
+                       fprintf(stderr, "bound to fd %d\n", sock);
                lnfds++;
-               pfd[lnfds - 1].fd = sock;
-               pfd[lnfds - 1].events = POLLIN;
-
        }
        freeaddrinfo(aitop);
        if (lnfds == 0)
                errx(1, "No working listen addresses found");
 
-       return lnfds;
+       return (lnfds);
 }      
 
 static void
-set_listening(struct pollfd *pfd, nfds_t lfds, int toggle) {
-       int i;
-
-       for (i = 0; i < (int)lfds; i++) {
-               if (toggle)
-                       pfd[i].events = POLLIN;
-               else
-                       pfd[i].events = 0;
-       }
-                       
-}
-static void __dead
-serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop)
+client_handle_sc(int fd, short event, void *v_sc)
 {
-       socklen_t sslen;
-       struct pollfd *pfd;
-       char tmp[128], *buf;
-       struct statctx *psc;
-       struct sockaddr_storage ss;
-       nfds_t i, nfds, lfds;
-       size_t nalloc;
-       int r, sock, client_id;
-
-       sslen = sizeof(ss);
-       nalloc = 128;
-       if ((pfd = calloc(sizeof(*pfd), nalloc)) == NULL)
-               err(1, "calloc");
-       if ((psc = calloc(sizeof(*psc), nalloc)) == NULL)
-               err(1, "calloc");
-       if ((buf = malloc(Bflag)) == NULL)
-               err(1, "malloc");
-       lfds = nfds = serverbind(pfd, nalloc - 1, aitop);
-       if (vflag >= 3)
-               fprintf(stderr, "listening on %d fds\n", lfds);
-       if (setpgid(0, 0) == -1)
-               err(1, "setpgid");
-       
-       print_header();
-       
-       client_id = 0;
-       while (!done) {
-               if (proc_slice) { 
-                       process_slice(psc + lfds, nfds - lfds);
-                       stats_cleanslice();
-                       proc_slice = 0;
-               }
-               if (vflag >= 3) 
-                       fprintf(stderr, "mainstats.nconns = %u\n",
-                           mainstats.nconns);
-               if ((r = poll(pfd, nfds, INFTIM)) == -1) {
-                       if (errno == EINTR)
-                               continue;
-                       warn("poll");
-                       break;
-               }
+       struct statctx *sc = v_sc;
+       ssize_t n;
 
-               if (vflag >= 3)
-                       fprintf(stderr, "poll: %d\n", r);
-               for (i = 0 ; r > 0 && i < nfds; i++) {
-                       if ((pfd[i].revents & POLLIN) == 0)
-                               continue;
-                       if (pfd[i].fd == -1)
-                               errx(1, "pfd insane");
-                       r--;
-                       if (vflag >= 3)
-                               fprintf(stderr, "fd %d active i = %d\n",
-                                   pfd[i].fd, i);
-                       /* new connection */
-                       if (i < lfds) {
-                               if ((sock = accept(pfd[i].fd,
-                                   (struct sockaddr *)&ss,
-                                   &sslen)) == -1) {
-                                       if (errno == EINTR)
-                                               continue;
-                                       else if (errno == EMFILE ||
-                                           errno == ENFILE)
-                                               set_listening(pfd, lfds, 0);
-                                       warn("accept");
-                                       continue;
-                               }
-                               if ((r = fcntl(sock, F_GETFL, 0)) == -1)
-                                       err(1, "fcntl(F_GETFL)");
-                               r |= O_NONBLOCK;
-                               if (fcntl(sock, F_SETFL, r) == -1)
-                                       err(1, "fcntl(F_SETFL, O_NONBLOCK)");
-                               saddr_ntop((struct sockaddr *)&ss, sslen,
-                                   tmp, sizeof(tmp));
+       /* XXX: Can this block if we lack entropy ? */
+       arc4random_buf(dummybuf, dummybuf_len);
+again:
+       if ((n = write(sc->fd, sc->buf, sc->buflen)) == -1) {
+               if (errno == EINTR || errno == EAGAIN)
+                       goto again;
+               /* Hammer network */
+               if (UDP_MODE && errno == ENOBUFS) {
+                       /*
+                        * XXX: release cpu, no sched_yield(2), is there a
+                        * better way to do this ?
+                        */
+                       usleep(50000);
                                if (vflag)
-                                       fprintf(stderr,
-                                           "Accepted connection %d from "
-                                           "%s, fd = %d\n", client_id++, tmp,
-                                            sock);
-                               /* alloc more space if we're full */
-                               if (nfds == nalloc) {
-                                       nalloc *= 2;
-                                       if ((pfd = realloc(pfd,
-                                           sizeof(*pfd) * nalloc)) == NULL)
-                                               err(1, "realloc");
-                                       if ((psc = realloc(psc,
-                                           sizeof(*psc) * nalloc)) == NULL)
-                                               err(1, "realloc");
+                               warn("write");
+                       goto again;
                                }
-                               pfd[nfds].fd = sock;
-                               pfd[nfds].events = POLLIN;
-                               stats_prepare(&psc[nfds], sock, kvmh, ktcbtab);
-                               nfds++;
-                               if (!mainstats.nconns++)
-                                       set_timer(1);
-                               continue;
+               err(1, "write");
                        }
-                       /* event in fd */
-                       if (vflag >= 3)
-                               fprintf(stderr,
-                                   "fd %d active", pfd[i].fd);
-                       while (handle_connection(&psc[i], pfd[i].fd,
-                           buf, Bflag) == -1) {
-                               pfd[i] = pfd[nfds - 1];
-                               pfd[nfds - 1].fd = -1;
-                               psc[i] = psc[nfds - 1];
-                               mainstats.nconns--;
-                               nfds--;
-                               /* stop display if no clients */
-                               if (!mainstats.nconns) {
-                                       proc_slice = 1;
-                                       set_timer(0);
+       if (TCP_MODE && n == 0) {       
+               warnx("Remote end closed connection");
+               exit(1);
                                }
-                               /* if we were full */
-                               set_listening(pfd, lfds, 1);
+       if (vflag >= 3)
+               warnx("write: %zd bytes\n", n);
+       sc->bytes += n;
+       mainstats.slice_bytes += n;
+       if (UDP_MODE)
+               sc->udp_slice_pkts++;
 
-                               /* is there an event pending on the last fd? */
-                               if (pfd[i].fd == -1 ||
-                                   (pfd[i].revents & POLLIN) == 0)
-                                       break;
-                       }
-               }
-       }
-       exit(1);
 }
 
-void
-clientconnect(struct addrinfo *aitop, struct pollfd *pfd, int nconn)
+static void
+client_init(struct addrinfo *aitop, int nconn, struct statctx *udp_sc)
 {
-       char tmp[128];
+       struct statctx *sc;
        struct addrinfo *ai;
+       char tmp[128];
        int i, r, sock;
 
+       sc = udp_sc;
        for (i = 0; i < nconn; i++) {
                for (sock = -1, ai = aitop; ai != NULL; ai = ai->ai_next) {
                        saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp,
@@ -769,11 +827,11 @@ clientconnect(struct addrinfo *aitop, st
                                        warn("socket");
                                continue;
                        }
-                       if (rtableid && ai->ai_family == AF_INET) {
+                       if (Vflag && ai->ai_family == AF_INET) {
                                if (setsockopt(sock, IPPROTO_IP, SO_RTABLE,
-                                   &rtableid, sizeof(rtableid)) == -1)
+                                   &Vflag, sizeof(Vflag)) == -1)
                                        err(1, "setsockopt SO_RTABLE");
-                       } else if (rtableid)
+                       } else if (Vflag)
                                warnx("rtable only supported on AF_INET");
                        if (Sflag) {
                                if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
@@ -793,96 +851,31 @@ clientconnect(struct addrinfo *aitop, st
                }
                if (sock == -1)
                        errx(1, "No host found");
-
                if ((r = fcntl(sock, F_GETFL, 0)) == -1)
                        err(1, "fcntl(F_GETFL)");
                r |= O_NONBLOCK;
                if (fcntl(sock, F_SETFL, r) == -1)
                        err(1, "fcntl(F_SETFL, O_NONBLOCK)");
-
-               pfd[i].fd = sock;
-               pfd[i].events = POLLOUT;
-       }
-       freeaddrinfo(aitop);
-
-       if (vflag && nconn > 1)
-               fprintf(stderr, "%u connections established\n", nconn);
+               /* Alloc and prepare stats */
+               if (TCP_MODE) {
+                       if ((sc = calloc(1, sizeof(*sc))) == NULL)
+                               err(1, "calloc");
 }
-
-static void __dead
-clientloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop, int nconn)
-{
-       struct statctx *psc;
-       struct pollfd *pfd;
-       char *buf;
-       int i;
-       ssize_t n;
-
-       if ((pfd = calloc(nconn, sizeof(*pfd))) == NULL)
-               err(1, "clientloop pfd calloc");
-       if ((psc = calloc(nconn, sizeof(*psc))) == NULL)
-               err(1, "clientloop psc calloc");
-       
-       clientconnect(aitop, pfd, nconn);
-
-       for (i = 0; i < nconn; i++) {
-               stats_prepare(psc + i, pfd[i].fd, kvmh, ktcbtab);
+               sc->fd = sock;
+               stats_prepare(sc);
+               event_set(&sc->ev, sc->fd, EV_WRITE | EV_PERSIST,
+                   client_handle_sc, sc);
+               event_add(&sc->ev, NULL);
+               TAILQ_INSERT_TAIL(&sc_queue, sc, entry);
                mainstats.nconns++;
-       }
-
-       if ((buf = malloc(Bflag)) == NULL)
-               err(1, "malloc");
-       arc4random_buf(buf, Bflag);
-
-       print_header();
-       set_timer(1);
-
-       while (!done) {
-               if (proc_slice) {
-                       process_slice(psc, nconn);
-                       stats_cleanslice();
-                       proc_slice = 0;
-               }
-               if (poll(pfd, nconn, INFTIM) == -1) {
-                       if (errno == EINTR)
-                               continue;
-                       err(1, "poll");
-               }
-               for (i = 0; i < nconn; i++) {
-                       if (pfd[i].revents & POLLOUT) {
-                               if ((n = write(pfd[i].fd, buf, Bflag)) == -1) {
-                                       if (errno == EINTR || errno == EAGAIN)
-                                               continue;
-                                       err(1, "write");
-                               }
-                               if (n == 0) {
-                                       warnx("Remote end closed connection");
-                                       done = -1;
+               set_slice_timer(mainstats.nconns > 0);
+               if (UDP_MODE)
                                        break;
                                }
-                               if (vflag >= 3)
-                                       fprintf(stderr, "write: %zd bytes\n",
-                                           n);
-                               stats_update(psc + i, n);
-                       }
-               }
-       }
-       
-       if (done > 0)
-               warnx("Terminated by signal %d", done);
-
-       free(buf);
-       exit(0);
-}
-
-static void
-drop_gid(void)
-{
-       gid_t gid;
+       freeaddrinfo(aitop);
 
-       gid = getgid();
-       if (setresgid(gid, gid, gid) == -1)
-               err(1, "setresgid");
+       if (vflag && nconn > 1)
+               fprintf(stderr, "%u connections established\n", nconn);
 }
 
 int
@@ -890,23 +883,24 @@ main(int argc, char **argv)
 {
        extern int optind;
        extern char *optarg;
-
        char kerr[_POSIX2_LINE_MAX], *tmp;
        struct addrinfo *aitop, hints;
        const char *errstr;
-       kvm_t *kvmh = NULL;
        struct rlimit rl;
-       int ch, herr;
+       int ch, herr, nconn;
        struct nlist nl[] = { { "_tcbtable" }, { "" } };
        const char *host = NULL, *port = DEFAULT_PORT;
-       int nconn = 1;
+       struct event ev_sigint, ev_sigterm, ev_sighup;
+       struct statctx *udp_sc = NULL;
 
-       Bflag = DEFAULT_BUF;
-       Sflag = sflag = vflag = rtableid = 0;
-       kflag = NULL;
+       dummybuf_len = 0;
+       Sflag = sflag = vflag = Vflag = 0;
+       kvmh  = NULL;
+       kvars = NULL;
        rflag = DEFAULT_STATS_INTERVAL;
+       nconn = 1;
 
-       while ((ch = getopt(argc, argv, "B:hlk:n:p:r:sS:vV:")) != -1) {
+       while ((ch = getopt(argc, argv, "B:hlk:n:p:r:sS:uvV:")) != -1) {
                switch (ch) {
                case 'l':
                        list_kvars();
@@ -914,7 +908,7 @@ main(int argc, char **argv)
                case 'k':
                        if ((tmp = strdup(optarg)) == NULL)
                                errx(1, "strdup");
-                       kflag = check_prepare_kvars(tmp);
+                       kvars = check_prepare_kvars(tmp);
                        free(tmp);
                        break;
                case 'r':
@@ -938,7 +932,7 @@ main(int argc, char **argv)
                                    errstr, optarg);
                        break;
                case 'B':
-                       Bflag = strtonum(optarg, 0, 1024*1024*1024,
+                       dummybuf_len = strtonum(optarg, 0, 1024*1024*1024,
                            &errstr);
                        if (errstr != NULL)
                                errx(1, "read/write buffer size is %s: %s",
@@ -948,7 +942,7 @@ main(int argc, char **argv)
                        vflag++;
                        break;
                case 'V':
-                       rtableid = (unsigned int)strtonum(optarg, 0,
+                       Vflag = (unsigned int)strtonum(optarg, 0,
                            RT_TABLEID_MAX, &errstr);
                        if (errstr)
                                errx(1, "rtable value is %s: %s",
@@ -960,6 +954,9 @@ main(int argc, char **argv)
                                errx(1, "number of connections is %s: %s",
                                    errstr, optarg);
                        break;
+               case 'u':
+                       uflag = 1;
+                       break;
                case 'h':
                default:
                        usage();
@@ -968,13 +965,28 @@ main(int argc, char **argv)
 
        argv += optind;
        argc -= optind;
-       if (argc != (sflag ? 0 : 1))
+       if ((argc != (sflag ? 0 : 1)) ||
+           (UDP_MODE && (kvars || nconn != 1)))
                usage();
 
        if (!sflag)
                host = argv[0];
-
+       /*
+        * Rationale,
+        * If TCP, use a big buffer with big reads/writes.
+        * If UDP, use a big buffer in server and a buffer the size of a
+        * ethernet packet.
+        */
+       if (!dummybuf_len) {
+               if (sflag || TCP_MODE) 
+                       dummybuf_len = DEFAULT_BUF;
+               else
+                       dummybuf_len = DEFAULT_UDP_PKT;
+       }
        bzero(&hints, sizeof(hints));
+       if (UDP_MODE)
+               hints.ai_socktype = SOCK_DGRAM;
+       else
        hints.ai_socktype = SOCK_STREAM;
        if (sflag)
                hints.ai_flags = AI_PASSIVE;
@@ -984,23 +996,17 @@ main(int argc, char **argv)
                else
                        errx(1, "getaddrinfo: %s", gai_strerror(herr));
        }
-
-       if (kflag) {
+       if (kvars) {
                if ((kvmh = kvm_openfiles(NULL, NULL, NULL,
                    O_RDONLY, kerr)) == NULL)
                        errx(1, "kvm_open: %s", kerr);
                drop_gid();
                if (kvm_nlist(kvmh, nl) < 0 || nl[0].n_type == 0)
                        errx(1, "kvm: no namelist");
+               ktcbtab = nl[0].n_value;
        } else
                drop_gid();
 
-       signal(SIGINT, exitsighand);
-       signal(SIGTERM, exitsighand);
-       signal(SIGHUP, exitsighand);
-       signal(SIGPIPE, SIG_IGN);
-       signal(SIGALRM, alarmhandler);
-
        if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
                err(1, "getrlimit");
        if (rl.rlim_cur < MAX_FD)
@@ -1010,10 +1016,44 @@ main(int argc, char **argv)
        if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
                err(1, "getrlimit");
        
-       if (sflag)
-               serverloop(kvmh, nl[0].n_value, aitop);
+       /* Init world */
+       TAILQ_INIT(&sc_queue);
+       if ((dummybuf = malloc(dummybuf_len)) == NULL)
+               err(1, "malloc");
+       if (UDP_MODE) {
+               if ((udp_sc = calloc(1, sizeof(*udp_sc))) == NULL)
+                       err(1, "calloc");
+               udp_sc->fd = -1;
+               stats_prepare(udp_sc);
+       }
+
+       /* Setup libevent and signals */
+       event_init();
+       signal_set(&ev_sigterm, SIGTERM, signal_handler, NULL);
+       signal_set(&ev_sighup, SIGHUP, signal_handler, NULL);
+       signal_set(&ev_sigint, SIGINT, signal_handler, NULL);
+       signal_add(&ev_sigint, NULL);
+       signal_add(&ev_sigterm, NULL);
+       signal_add(&ev_sighup, NULL);
+       signal(SIGPIPE, SIG_IGN);
+       
+       if (TCP_MODE)
+               print_tcp_header();
+       
+       if (UDP_MODE)
+               evtimer_set(&mainstats.timer, udp_process_slice, udp_sc);
        else
-               clientloop(kvmh, nl[0].n_value, aitop, nconn);
+               evtimer_set(&mainstats.timer, tcp_process_slice, NULL);
 
-       return 0;
+       if (sflag) {
+               (void)server_init(aitop, udp_sc);
+               if (setpgid(0, 0) == -1)
+                       err(1, "setpgid");
+       } else
+               client_init(aitop, nconn, udp_sc);
+       
+       /* libevent main loop*/
+       event_dispatch();
+
+       return (0);
 }

Reply via email to