On Sun, Dec 28, 2014 at 05:33:08PM +0100, Alexander Bluhm wrote:
> Jasper tested and found that it only worked on loopback. I have
> forgotten to check for EINPROGRESS after connect. So here is a new
> diff.
>
> bluhm
Succesfully tested now with a remote logstash host.
> Index: privsep.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/privsep.c,v
> retrieving revision 1.48
> diff -u -p -r1.48 privsep.c
> --- privsep.c 5 Oct 2014 18:14:01 -0000 1.48
> +++ privsep.c 28 Dec 2014 15:48:51 -0000
> @@ -317,17 +317,34 @@ priv_init(char *conf, int numeric, int l
> servname[servname_len - 1] = '\0';
>
> memset(&hints, 0, sizeof(hints));
> - if (strcmp(protoname, "udp") == 0) {
> + switch (strlen(protoname)) {
> + case 3:
> hints.ai_family = AF_UNSPEC;
> - } else if (strcmp(protoname, "udp4") == 0) {
> - hints.ai_family = AF_INET;
> - } else if (strcmp(protoname, "udp6") == 0) {
> - hints.ai_family = AF_INET6;
> + break;
> + case 4:
> + switch (protoname[3]) {
> + case '4':
> + hints.ai_family = AF_INET;
> + break;
> + case '6':
> + hints.ai_family = AF_INET6;
> + break;
> + default:
> + errx(1, "bad ip version %s", protoname);
> + }
> + break;
> + default:
> + errx(1, "bad protocol length %s", protoname);
> + }
> + if (strncmp(protoname, "udp", 3) == 0) {
> + hints.ai_socktype = SOCK_DGRAM;
> + hints.ai_protocol = IPPROTO_UDP;
> + } else if (strncmp(protoname, "tcp", 3) == 0) {
> + hints.ai_socktype = SOCK_STREAM;
> + hints.ai_protocol = IPPROTO_TCP;
> } else {
> errx(1, "unknown protocol %s", protoname);
> }
> - hints.ai_socktype = SOCK_DGRAM;
> - hints.ai_protocol = IPPROTO_UDP;
> i = getaddrinfo(hostname, servname, &hints, &res0);
> if (i != 0 || res0 == NULL) {
> addr_len = 0;
> Index: syslogd.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/usr.sbin/syslogd/syslogd.c,v
> retrieving revision 1.136
> diff -u -p -r1.136 syslogd.c
> --- syslogd.c 10 Dec 2014 19:42:14 -0000 1.136
> +++ syslogd.c 28 Dec 2014 16:25:55 -0000
> @@ -50,13 +50,14 @@
> * extensive changes by Ralph Campbell
> * more extensive changes by Eric Allman (again)
> * memory buffer logging by Damien Miller
> - * IPv6, libevent by Alexander Bluhm
> + * IPv6, libevent, sending via TCP by Alexander Bluhm
> */
>
> #define MAXLINE 1024 /* maximum line length */
> #define MIN_MEMBUF (MAXLINE * 4) /* Minimum memory buffer size */
> #define MAX_MEMBUF (256 * 1024) /* Maximum memory buffer size */
> #define MAX_MEMBUF_NAME 64 /* Max length of membuf log
> name */
> +#define MAX_TCPBUF (256 * 1024) /* Maximum tcp event buffer size */
> #define MAXSVLINE 120 /* maximum saved line length */
> #define DEFUPRI (LOG_USER|LOG_NOTICE)
> #define DEFSPRI (LOG_KERN|LOG_CRIT)
> @@ -132,6 +133,8 @@ struct filed {
> char f_loghost[1+4+3+1+MAXHOSTNAMELEN+1+NI_MAXSERV];
> /* @proto46://[hostname]:servname\0 */
> struct sockaddr_storage f_addr;
> + struct bufferevent *f_bufev;
> + int f_fd;
> } f_forw; /* forwarding address */
> char f_fname[MAXPATHLEN];
> struct {
> @@ -170,16 +173,17 @@ int repeatinterval[] = { 30, 120, 600 };
> #define F_FILE 1 /* regular file */
> #define F_TTY 2 /* terminal */
> #define F_CONSOLE 3 /* console terminal */
> -#define F_FORW 4 /* remote machine */
> +#define F_FORWUDP 4 /* remote machine via UDP */
> #define F_USERS 5 /* list of users */
> #define F_WALL 6 /* everyone logged on */
> #define F_MEMBUF 7 /* memory buffer */
> #define F_PIPE 8 /* pipe to external program */
> +#define F_FORWTCP 9 /* remote machine via TCP */
>
> -char *TypeNames[9] = {
> +char *TypeNames[] = {
> "UNUSED", "FILE", "TTY", "CONSOLE",
> - "FORW", "USERS", "WALL", "MEMBUF",
> - "PIPE"
> + "FORWUDP", "USERS", "WALL", "MEMBUF",
> + "PIPE", "FORWTCP",
> };
>
> struct filed *Files;
> @@ -259,6 +263,9 @@ struct event ev_ctlaccept, ev_ctlread,
> void klog_readcb(int, short, void *);
> void udp_readcb(int, short, void *);
> void unix_readcb(int, short, void *);
> +int tcp_socket(struct filed *);
> +void tcp_readcb(struct bufferevent *, void *);
> +void tcp_errorcb(struct bufferevent *, short, void *);
> void die_signalcb(int, short, void *);
> void mark_timercb(int, short, void *);
> void init_signalcb(int, short, void *);
> @@ -661,6 +668,82 @@ unix_readcb(int fd, short event, void *a
> logerror("recvfrom unix");
> }
>
> +int
> +tcp_socket(struct filed *f)
> +{
> + int s, flags;
> + char ebuf[100];
> +
> + if ((s = socket(f->f_un.f_forw.f_addr.ss_family, SOCK_STREAM,
> + IPPROTO_TCP)) == -1) {
> + snprintf(ebuf, sizeof(ebuf), "socket \"%s\"",
> + f->f_un.f_forw.f_loghost);
> + logerror(ebuf);
> + return (-1);
> + }
> + /* Connect must not block the process. */
> + if ((flags = fcntl(s, F_GETFL)) == -1 ||
> + fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
> + snprintf(ebuf, sizeof(ebuf), "fcntl \"%s\" O_NONBLOCK",
> + f->f_un.f_forw.f_loghost);
> + logerror(ebuf);
> + close(s);
> + return (-1);
> + }
> + if (connect(s, (struct sockaddr *)&f->f_un.f_forw.f_addr,
> + f->f_un.f_forw.f_addr.ss_len) == -1 && errno != EINPROGRESS) {
> + snprintf(ebuf, sizeof(ebuf), "connect \"%s\"",
> + f->f_un.f_forw.f_loghost);
> + logerror(ebuf);
> + close(s);
> + return (-1);
> + }
> + return (s);
> +}
> +
> +void
> +tcp_readcb(struct bufferevent *bufev, void *arg)
> +{
> + struct filed *f = arg;
> +
> + /*
> + * Drop data received from the forward log server.
> + */
> + dprintf("loghost \"%s\" did send %zu bytes back\n",
> + f->f_un.f_forw.f_loghost,
> + EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->input));
> + evbuffer_drain(bufev->input, -1);
> +}
> +
> +void
> +tcp_errorcb(struct bufferevent *bufev, short event, void *arg)
> +{
> + struct filed *f = arg;
> + char ebuf[100];
> +
> + if (event & EVBUFFER_EOF)
> + snprintf(ebuf, sizeof(ebuf),
> + "syslogd: loghost \"%s\" connection close",
> + f->f_un.f_forw.f_loghost);
> + else
> + snprintf(ebuf, sizeof(ebuf),
> + "syslogd: loghost \"%s\" connection error: %s",
> + f->f_un.f_forw.f_loghost, strerror(errno));
> + dprintf("%s\n", ebuf);
> +
> + close(f->f_un.f_forw.f_fd);
> + if ((f->f_un.f_forw.f_fd = tcp_socket(f)) == -1) {
> + /* XXX reconnect later */
> + bufferevent_free(bufev);
> + f->f_type = F_UNUSED;
> + } else {
> + /* XXX The messages in the output buffer may be out of sync. */
> + bufferevent_setfd(bufev, f->f_un.f_forw.f_fd);
> + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
> + }
> + logmsg(LOG_SYSLOG|LOG_WARNING, ebuf, LocalHostName, ADDDATE);
> +}
> +
> void
> usage(void)
> {
> @@ -883,7 +966,7 @@ fprintlog(struct filed *f, int flags, ch
> {
> struct iovec iov[6];
> struct iovec *v;
> - int fd, l, retryonce;
> + int l, retryonce;
> char line[MAXLINE + 1], repbuf[80], greetings[500];
>
> v = iov;
> @@ -938,19 +1021,8 @@ fprintlog(struct filed *f, int flags, ch
> dprintf("\n");
> break;
>
> - case F_FORW:
> + case F_FORWUDP:
> dprintf(" %s\n", f->f_un.f_forw.f_loghost);
> - switch (f->f_un.f_forw.f_addr.ss_family) {
> - case AF_INET:
> - fd = fd_udp;
> - break;
> - case AF_INET6:
> - fd = fd_udp6;
> - break;
> - default:
> - fd = -1;
> - break;
> - }
> l = snprintf(line, sizeof(line), "<%d>%.15s %s%s%s",
> f->f_prevpri, (char *)iov[0].iov_base,
> IncludeHostname ? LocalHostName : "",
> @@ -958,7 +1030,7 @@ fprintlog(struct filed *f, int flags, ch
> (char *)iov[4].iov_base);
> if (l < 0 || (size_t)l >= sizeof(line))
> l = strlen(line);
> - if (sendto(fd, line, l, 0,
> + if (sendto(f->f_un.f_forw.f_fd, line, l, 0,
> (struct sockaddr *)&f->f_un.f_forw.f_addr,
> f->f_un.f_forw.f_addr.ss_len) != l) {
> switch (errno) {
> @@ -977,6 +1049,26 @@ fprintlog(struct filed *f, int flags, ch
> }
> break;
>
> + case F_FORWTCP:
> + dprintf(" %s\n", f->f_un.f_forw.f_loghost);
> + if (EVBUFFER_LENGTH(f->f_un.f_forw.f_bufev->output) >=
> + MAX_TCPBUF)
> + break; /* XXX log error message */
> + /*
> + * RFC 6587 3.4.2. Non-Transparent-Framing
> + * Use \n to split messages for now.
> + * 3.4.1. Octet Counting might be implemented later.
> + */
> + l = evbuffer_add_printf(f->f_un.f_forw.f_bufev->output,
> + "<%d>%.15s %s%s%s\n", f->f_prevpri, (char *)iov[0].iov_base,
> + IncludeHostname ? LocalHostName : "",
> + IncludeHostname ? " " : "",
> + (char *)iov[4].iov_base);
> + if (l < 0)
> + break; /* XXX log error message */
> + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_WRITE);
> + break;
> +
> case F_CONSOLE:
> if (flags & IGN_CONS) {
> dprintf(" (ignored)\n");
> @@ -1249,7 +1341,8 @@ init(void)
> case F_PIPE:
> (void)close(f->f_file);
> break;
> - case F_FORW:
> + case F_FORWUDP:
> + case F_FORWTCP: /* XXX close and reconnect? */
> break;
> }
> next = f->f_next;
> @@ -1383,7 +1476,8 @@ init(void)
> printf("%s", f->f_un.f_fname);
> break;
>
> - case F_FORW:
> + case F_FORWUDP:
> + case F_FORWTCP:
> printf("%s", f->f_un.f_forw.f_loghost);
> break;
>
> @@ -1575,6 +1669,9 @@ cfline(char *line, char *prog)
> logerror(ebuf);
> break;
> }
> + } else if (strcmp(proto, "tcp") == 0 ||
> + strcmp(proto, "tcp4") == 0 || strcmp(proto, "tcp6") == 0) {
> + ;
> } else {
> snprintf(ebuf, sizeof(ebuf), "bad protocol \"%s\"",
> f->f_un.f_forw.f_loghost);
> @@ -1603,7 +1700,35 @@ cfline(char *line, char *prog)
> logerror(ebuf);
> break;
> }
> - f->f_type = F_FORW;
> + f->f_un.f_forw.f_fd = -1;
> + if (strncmp(proto, "udp", 3) == 0) {
> + switch (f->f_un.f_forw.f_addr.ss_family) {
> + case AF_INET:
> + f->f_un.f_forw.f_fd = fd_udp;
> + break;
> + case AF_INET6:
> + f->f_un.f_forw.f_fd = fd_udp6;
> + break;
> + }
> + f->f_type = F_FORWUDP;
> + } else if (strncmp(proto, "tcp", 3) == 0) {
> + int s;
> +
> + if ((s = tcp_socket(f)) == -1)
> + break;
> + if ((f->f_un.f_forw.f_bufev = bufferevent_new(s,
> + tcp_readcb, NULL, tcp_errorcb, f)) == NULL) {
> + snprintf(ebuf, sizeof(ebuf),
> + "bufferevent \"%s\"",
> + f->f_un.f_forw.f_loghost);
> + logerror(ebuf);
> + close(s);
> + break;
> + }
> + bufferevent_enable(f->f_un.f_forw.f_bufev, EV_READ);
> + f->f_un.f_forw.f_fd = s;
> + f->f_type = F_FORWTCP;
> + }
> break;
>
> case '/':
> @@ -1914,7 +2039,7 @@ ctlsock_acceptcb(int fd, short event, vo
>
> if ((flags = fcntl(fd, F_GETFL)) == -1 ||
> fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> - logerror("fcntl ctlconn");
> + logerror("fcntl ctlconn O_NONBLOCK");
> close(fd);
> return;
> }
>
--
jasper