On Mon, Jun 08, 2009 at 06:14:31PM +0200, Reyk Floeter wrote:
> hi,
>
> the idea sounds ok, but why just 128? tcpbench is for benchmarking
> and testing and it should be possible to run more concurrent
> connections.
>
> it could call getrlimit() to get the actual RLIMIT_NOFILE value which
> is 128 by default but can be much higher. another variant is the way
> spamd/spamd.c is doing it by looking at KERN_MAXFILES.
>
Hmm, I didn't know we had a soft limit, I thought you had to recompile
the kernel and tune max files, I agree, I'll add support for expanding
the limit. Is there any sane limit ? Or I can push as far as it goes ?
I didn't pay much attention to the kvars monitoring yet, are those
values per process in the kernel ?
Another feature I was thinking would be to dump the output in one file
per host. What do you guys think ? Besides correcting fd limit, udp
support, any other ideas ?
Best regards.
>
> On Mon, Jun 08, 2009 at 10:44:37AM -0300, Christiano Farina Haesbaert wrote:
> > Hi,
> >
> > The following patch makes tcpbench(1) non-forking and non-blocking,
> > I've changed the output to show the file descriptor instead of the
> > pid.
> >
> > The server will no longer die due to excessive forking as it's all
> > wraped in a single process, we are limited however to 128 fds.
> >
> > There is an error when mapping certain kvars with more than one
> > connection as there is only one single process.
> > I've noticed a small drop in performance when using more than 10+
> > connections, but it's seems irrelevant.
> >
> > This was suggested by Henning at misc@,
> >
> > Any feedback is welcome, I'll be working on udp support for now.
> >
> > Index: tcpbench.c
> > ===================================================================
> > RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v
> > retrieving revision 1.8
> > diff -u -d -r1.8 tcpbench.c
> > --- tcpbench.c 18 Sep 2008 10:23:33 -0000 1.8
> > +++ tcpbench.c 8 Jun 2009 13:46:48 -0000
> > @@ -50,14 +50,14 @@
> > #define DEFAULT_PORT "12345"
> > #define DEFAULT_STATS_INTERVAL 1000 /* ms */
> > #define DEFAULT_BUF 256 * 1024
> > +#define MAX_FD 128
> >
> > sig_atomic_t done = 0;
> > -sig_atomic_t print_stats = 0;
> >
> > struct statctx {
> > struct timeval t_start, t_last, t_cur;
> > unsigned long long bytes;
> > - pid_t pid;
> > + int fd;
> > u_long tcbaddr;
> > kvm_t *kh;
> > char **kvars;
> > @@ -103,7 +103,6 @@
> > static void
> > alarmhandler(int signo)
> > {
> > - print_stats = 1;
> > signal(signo, alarmhandler);
> > }
> >
> > @@ -309,24 +308,22 @@
> > {
> > struct itimerval itv;
> > int i;
> > -
> > +
> > if (rflag <= 0)
> > return;
> > + sc->fd = fd;
> > sc->kh = kh;
> > sc->kvars = kflag;
> > + sc->bytes = 0;
> > if (kflag)
> > sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd, vflag);
> > gettimeofday(&sc->t_start, NULL);
> > sc->t_last = sc->t_start;
> > - signal(SIGALRM, alarmhandler);
> > itv.it_interval.tv_sec = rflag / 1000;
> > itv.it_interval.tv_usec = (rflag % 1000) * 1000;
> > itv.it_value = itv.it_interval;
> > - setitimer(ITIMER_REAL, &itv, NULL);
> > - sc->bytes = 0;
> > - sc->pid = getpid();
> >
> > - printf("%8s %12s %14s %12s ", "pid", "elapsed_ms", "bytes", "Mbps");
> > + printf("%8s %12s %14s %12s ", "fd", "elapsed_ms", "bytes", "Mbps");
> > if (sc->kvars != NULL) {
> > for (i = 0; sc->kvars[i] != NULL; i++)
> > printf("%s%s", i > 0 ? "," : "", sc->kvars[i]);
> > @@ -342,7 +339,7 @@
> > }
> >
> > static void
> > -stats_display(struct statctx *sc)
> > +stats_display(struct statctx *sc, int rflag)
> > {
> > struct timeval t_diff;
> > unsigned long long total_elapsed, since_last;
> > @@ -356,11 +353,13 @@
> > total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
> > timersub(&sc->t_cur, &sc->t_last, &t_diff);
> > since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
> > - printf("%8ld %12llu %14llu %12.3Lf ", (long)sc->pid,
> > + if (since_last <= rflag)
> > + return;
> > + printf("%8d %12llu %14llu %12.3Lf ", sc->fd,
> > total_elapsed, sc->bytes,
> > (long double)(sc->bytes * 8) / (since_last * 1000.0));
> > sc->t_last = sc->t_cur;
> > - sc->bytes = 0;
> > + sc->bytes = 0;
> >
> > if (sc->kvars != NULL) {
> > kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb, &sockb);
> > @@ -403,91 +402,31 @@
> > fflush(stdout);
> > }
> >
> > -static void
> > -stats_finish(struct statctx *sc)
> > -{
> > - struct itimerval itv;
> > -
> > - signal(SIGALRM, SIG_DFL);
> > - bzero(&itv, sizeof(itv));
> > - setitimer(ITIMER_REAL, &itv, NULL);
> > -}
> > -
> > -static void __dead
> > -handle_connection(kvm_t *kvmh, u_long ktcbtab, int sock, int vflag,
> > - int rflag, char **kflag, int Bflag)
> > -{
> > - char *buf;
> > - struct pollfd pfd;
> > - ssize_t n;
> > - int r;
> > - struct statctx sc;
> > -
> > - if ((buf = malloc(Bflag)) == NULL)
> > - err(1, "malloc");
> > - if ((r = fcntl(sock, F_GETFL, 0)) == -1)
> > - err(1, "fcntl(F_GETFL)");
> > - r |= O_NONBLOCK;
> > - if (fcntl(sock, F_SETFL, r) == -1)
> > - err(1, "fcntl(F_SETFL, O_NONBLOCK)");
> > -
> > - signal(SIGINT, exitsighand);
> > - signal(SIGTERM, exitsighand);
> > - signal(SIGHUP, exitsighand);
> > - signal(SIGPIPE, SIG_IGN);
> > -
> > - bzero(&pfd, sizeof(pfd));
> > - pfd.fd = sock;
> > - pfd.events = POLLIN;
> > -
> > - stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
> > -
> > - while (!done) {
> > - if (print_stats) {
> > - stats_display(&sc);
> > - print_stats = 0;
> > - }
> > - if (poll(&pfd, 1, INFTIM) == -1) {
> > - if (errno == EINTR)
> > - continue;
> > - err(1, "poll");
> > - }
> > - if ((n = read(pfd.fd, buf, Bflag)) == -1) {
> > - if (errno == EINTR || errno == EAGAIN)
> > - continue;
> > - err(1, "read");
> > - }
> > - if (n == 0) {
> > - fprintf(stderr, "%8ld closed by remote end\n",
> > - (long)getpid());
> > - done = -1;
> > - break;
> > - }
> > - if (vflag >= 3)
> > - fprintf(stderr, "read: %zd bytes\n", n);
> > - stats_update(&sc, n);
> > - }
> > - stats_finish(&sc);
> > -
> > - free(buf);
> > - close(sock);
> > - exit(1);
> > -}
> > -
> > static void __dead
> > serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop,
> > int vflag, int rflag, char **kflag, int Sflag, int Bflag)
> > {
> > +
> > + char *buf;
> > char tmp[128];
> > - int r, sock, client_id, on = 1;
> > + int i, r, sock, client_id, fdval, nextfree, on = 1;
> > + size_t listenfds, nfds, lastlfd;
> > + socklen_t sslen;
> > + ssize_t n;
> > struct addrinfo *ai;
> > - struct pollfd *pfd;
> > + struct pollfd pfd[MAX_FD];
> > struct sockaddr_storage ss;
> > - socklen_t sslen;
> > - size_t nfds, i, j;
> > + struct statctx psc[MAX_FD];
> >
> > - pfd = NULL;
> > - nfds = 0;
> > + bzero(pfd, sizeof(pfd));
> > + bzero(psc, sizeof(pfd));
> > + nfds = listenfds = 0;
> > + lastlfd = -1;
> > + if ((buf = malloc(Bflag)) == NULL)
> > + err(1, "malloc");
> > + for (i = 0; i < MAX_FD; ++i)
> > + pfd[i].fd = -1;
> > +
> > for (ai = aitop; ai != NULL; ai = ai->ai_next) {
> > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp));
> > if (vflag)
> > @@ -500,9 +439,16 @@
> > warn("socket");
> > continue;
> > }
> > - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
> > - &on, sizeof(on)) == -1)
> > +
> > + if ((fdval = fcntl(sock, F_GETFL, 0)) == -1)
> > + err(1, "fcntl(F_GETFL)");
> > + fdval |= O_NONBLOCK;
> > + if (fcntl(sock, F_SETFL, fdval) == -1)
> > + err(1, "fcntl(F_SETFL, O_NONBLOCK)");
> > + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on,
> > + sizeof(on)) == -1)
> > warn("reuse port");
> > +
> > if (bind(sock, ai->ai_addr, ai->ai_addrlen) != 0) {
> > if (ai->ai_next == NULL)
> > err(1, "bind");
> > @@ -524,29 +470,23 @@
> > close(sock);
> > continue;
> > }
> > - if (nfds > 128)
> > + if (nfds > MAX_FD - 1)
> > break;
> > - if ((pfd = realloc(pfd, ++nfds * sizeof(*pfd))) == NULL)
> > - errx(1, "realloc(pfd * %zu)", nfds);
> > +
> > + nfds++;
> > + lastlfd = sock;
> > pfd[nfds - 1].fd = sock;
> > pfd[nfds - 1].events = POLLIN;
> > }
> > freeaddrinfo(aitop);
> > if (nfds == 0)
> > errx(1, "No working listen addresses found");
> > -
> > - signal(SIGINT, exitsighand);
> > - signal(SIGTERM, exitsighand);
> > - signal(SIGHUP, exitsighand);
> > - signal(SIGPIPE, SIG_IGN);
> > - signal(SIGCHLD, SIG_IGN);
> > -
> > - if (setpgid(0, 0) == -1)
> > - err(1, "setpgid");
> > -
> > + listenfds = nfds;
> > +
> > client_id = 0;
> > - while (!done) {
> > - if ((r = poll(pfd, nfds, INFTIM)) == -1) {
> > + while (!done) {
> > + fflush(stdout);
> > + if ((r = poll(pfd, MAX_FD, INFTIM)) == -1) {
> > if (errno == EINTR)
> > continue;
> > warn("poll");
> > @@ -554,52 +494,82 @@
> > }
> > if (vflag >= 3)
> > fprintf(stderr, "poll: %d\n", r);
> > - for (i = 0 ; r > 0 && i < nfds; i++) {
> > - if ((pfd[i].revents & POLLIN) == 0)
> > +
> > + for (i = 0 ; i < MAX_FD && r > 0; i++) {
> > + if (pfd[i].fd == -1 || (pfd[i].revents & POLLIN) == 0)
> > continue;
> > + r--;
> > +
> > if (vflag >= 3)
> > fprintf(stderr, "fd %d active\n", pfd[i].fd);
> > - r--;
> > - sslen = sizeof(ss);
> > - if ((sock = accept(pfd[i].fd, (struct sockaddr *)&ss,
> > - &sslen)) == -1) {
> > - if (errno == EINTR)
> > + if (pfd[i].fd <= lastlfd) {
> > + sslen = sizeof(ss);
> > + if ((sock = accept(pfd[i].fd,
> > + (struct sockaddr *)&ss, &sslen)) == -1) {
> > + if (errno == EINTR)
> > + continue;
> > + warn("accept");
> > continue;
> > - warn("accept");
> > - break;
> > + }
> > + saddr_ntop((struct sockaddr *)&ss, sslen,
> > + tmp, sizeof(tmp));
> > +
> > + for (nextfree = 0; nextfree < MAX_FD;
> > nextfree++)
> > + if (pfd[nextfree].fd == -1)
> > + break;
> > + /* We shouldn't fall here as accept will fail
> > first */
> > + if (nextfree >= MAX_FD) {
> > + fprintf(stderr, "maximum number of
> > connection reached\n");
> > + close(sock);
> > + continue;
> > + }
> > +
> > + nfds++;
> > + pfd[nextfree].fd = sock;
> > + pfd[nextfree].events = POLLIN;
> > + stats_prepare(&psc[(nfds - 1) - listenfds],
> > + sock, kvmh, ktcbtab, rflag, vflag, kflag);
> > + if (vflag)
> > + fprintf(stderr, "Accepted connection %d
> > from "
> > + "%s, fd = %d\n", client_id++, tmp,
> > sock);
> > }
> > - saddr_ntop((struct sockaddr *)&ss, sslen,
> > - tmp, sizeof(tmp));
> > - if (vflag)
> > - fprintf(stderr, "Accepted connection %d from "
> > - "%s, fd = %d\n", client_id++, tmp, sock);
> > - switch (fork()) {
> > - case -1:
> > - warn("fork");
> > - done = -1;
> > - break;
> > - case 0:
> > - for (j = 0; j < nfds; j++)
> > - if (j != i)
> > - close(pfd[j].fd);
> > - handle_connection(kvmh, ktcbtab, sock,
> > - vflag, rflag, kflag, Bflag);
> > - /* NOTREACHED */
> > - _exit(1);
> > - default:
> > - close(sock);
> > - break;
> > + else {
> > + n = read(pfd[i].fd, buf, Bflag);
> > + if (n == 0) {
> > + fprintf(stderr,
> > + "fd %d closed by remote end\n",
> > + pfd[i].fd);
> > + close(pfd[i].fd);
> > + nfds--;
> > + pfd[i].fd = -1;
> > + }
> > + else if (n == -1) {
> > + if (errno == EINTR || errno == EAGAIN)
> > + continue;
> > + warn("fd %d, read error\n", pfd[i].fd);
> > + close(pfd[i].fd);
> > + nfds--;
> > + pfd[i].fd = -1;
> > + }
> > + else {
> > + if (vflag >= 3)
> > + fprintf(stderr,
> > + "read: %zd bytes from fd:
> > %d\n",
> > + n, pfd[i].fd);
> > + stats_update(&psc[i - listenfds], n);
> > + stats_display(&psc[i - listenfds],
> > + rflag);
> > + }
> > }
> > if (done == -1)
> > break;
> > }
> > }
> > +
> > for (i = 0; i < nfds; i++)
> > close(pfd[i].fd);
> > if (done > 0)
> > warnx("Terminated by signal %d", done);
> > - signal(SIGTERM, SIG_IGN);
> > - killpg(0, SIGTERM);
> > exit(1);
> > }
> >
> > @@ -632,7 +602,7 @@
> > else
> > errx(1, "c getaddrinfo: %s",
> > gai_strerror(herr));
> > }
> > -
> > +
> > for (sock = -1, ai = aitop; ai != NULL; ai = ai->ai_next) {
> > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp,
> > sizeof(tmp));
> > @@ -681,18 +651,11 @@
> > fprintf(stderr, "%u connections established\n", scnt);
> > arc4random_buf(buf, Bflag);
> >
> > - signal(SIGINT, exitsighand);
> > - signal(SIGTERM, exitsighand);
> > - signal(SIGHUP, exitsighand);
> > - signal(SIGPIPE, SIG_IGN);
> > -
> > stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
> >
> > while (!done) {
> > - if (print_stats) {
> > - stats_display(&sc);
> > - print_stats = 0;
> > - }
> > + stats_display(&sc, rflag);
> > +
> > if (poll(pfd, nconn, INFTIM) == -1) {
> > if (errno == EINTR)
> > continue;
> > @@ -717,7 +680,6 @@
> > }
> > }
> > }
> > - stats_finish(&sc);
> >
> > if (done > 0)
> > warnx("Terminated by signal %d", done);
> > @@ -803,6 +765,8 @@
> > if (errstr != NULL)
> > errx(1, "number of connections is %s: %s",
> > errstr, optarg);
> > + if (nconn > MAX_FD)
> > + errx(1, "maximum number of connections is %d",
> > MAX_FD);
> > break;
> > case 'h':
> > default:
> > @@ -842,6 +806,11 @@
> > errx(1, "kvm: no namelist");
> > } else
> > drop_gid();
> > +
> > + signal(SIGINT, exitsighand);
> > + signal(SIGTERM, exitsighand);
> > + signal(SIGHUP, exitsighand);
> > + signal(SIGPIPE, SIG_IGN);
> >
> > if (sflag)
> > serverloop(kvmh, nl[0].n_value, aitop, vflag, rflag, kflag,
> >
> >
> >
> >
> > --
> > Christiano Farina HAESBAERT
> > Do NOT send me html mail.
> >
--
Christiano Farina HAESBAERT
Do NOT send me html mail.