ok benno@
Claudio Jeker([email protected]) on 2021.10.23 14:20:19 +0200:
> This diff changes the io read functions to work on ibufs.
> With this the poll loops will consume data with io_buf_read() until a full
> message is received and then that message is processed. Thanks to this
> the processes no longer block while waiting for more data in the io read
> functions.
>
> With this also the blocking/nonblocking dance done in a few places is no
> longer needed. Everything is now nonblocking.
>
> Further cleanup of the various marshaling functions can follow in a later
> step.
> --
> :wq Claudio
>
> Index: cert.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/cert.c,v
> retrieving revision 1.39
> diff -u -p -r1.39 cert.c
> --- cert.c 15 Oct 2021 22:30:33 -0000 1.39
> +++ cert.c 23 Oct 2021 11:58:43 -0000
> @@ -1281,33 +1281,31 @@ cert_buffer(struct ibuf *b, const struct
> }
>
> static void
> -cert_ip_read(int fd, struct cert_ip *p)
> +cert_ip_read(struct ibuf *b, struct cert_ip *p)
> {
> -
> - io_simple_read(fd, &p->afi, sizeof(enum afi));
> - io_simple_read(fd, &p->type, sizeof(enum cert_ip_type));
> + io_read_buf(b, &p->afi, sizeof(enum afi));
> + io_read_buf(b, &p->type, sizeof(enum cert_ip_type));
>
> if (p->type != CERT_IP_INHERIT) {
> - io_simple_read(fd, &p->min, sizeof(p->min));
> - io_simple_read(fd, &p->max, sizeof(p->max));
> + io_read_buf(b, &p->min, sizeof(p->min));
> + io_read_buf(b, &p->max, sizeof(p->max));
> }
>
> if (p->type == CERT_IP_RANGE)
> - ip_addr_range_read(fd, &p->range);
> + ip_addr_range_read(b, &p->range);
> else if (p->type == CERT_IP_ADDR)
> - ip_addr_read(fd, &p->ip);
> + ip_addr_read(b, &p->ip);
> }
>
> static void
> -cert_as_read(int fd, struct cert_as *p)
> +cert_as_read(struct ibuf *b, struct cert_as *p)
> {
> -
> - io_simple_read(fd, &p->type, sizeof(enum cert_as_type));
> + io_read_buf(b, &p->type, sizeof(enum cert_as_type));
> if (p->type == CERT_AS_RANGE) {
> - io_simple_read(fd, &p->range.min, sizeof(uint32_t));
> - io_simple_read(fd, &p->range.max, sizeof(uint32_t));
> + io_read_buf(b, &p->range.min, sizeof(uint32_t));
> + io_read_buf(b, &p->range.max, sizeof(uint32_t));
> } else if (p->type == CERT_AS_ID)
> - io_simple_read(fd, &p->id, sizeof(uint32_t));
> + io_read_buf(b, &p->id, sizeof(uint32_t));
> }
>
> /*
> @@ -1316,7 +1314,7 @@ cert_as_read(int fd, struct cert_as *p)
> * Always returns a valid pointer.
> */
> struct cert *
> -cert_read(int fd)
> +cert_read(struct ibuf *b)
> {
> struct cert *p;
> size_t i;
> @@ -1324,35 +1322,36 @@ cert_read(int fd)
> if ((p = calloc(1, sizeof(struct cert))) == NULL)
> err(1, NULL);
>
> - io_simple_read(fd, &p->valid, sizeof(int));
> - io_simple_read(fd, &p->expires, sizeof(time_t));
> - io_simple_read(fd, &p->purpose, sizeof(enum cert_purpose));
> - io_simple_read(fd, &p->ipsz, sizeof(size_t));
> + io_read_buf(b, &p->valid, sizeof(int));
> + io_read_buf(b, &p->expires, sizeof(time_t));
> + io_read_buf(b, &p->purpose, sizeof(enum cert_purpose));
> + io_read_buf(b, &p->ipsz, sizeof(size_t));
> +
> p->ips = calloc(p->ipsz, sizeof(struct cert_ip));
> if (p->ips == NULL)
> err(1, NULL);
> for (i = 0; i < p->ipsz; i++)
> - cert_ip_read(fd, &p->ips[i]);
> + cert_ip_read(b, &p->ips[i]);
>
> - io_simple_read(fd, &p->asz, sizeof(size_t));
> + io_read_buf(b, &p->asz, sizeof(size_t));
> p->as = calloc(p->asz, sizeof(struct cert_as));
> if (p->as == NULL)
> err(1, NULL);
> for (i = 0; i < p->asz; i++)
> - cert_as_read(fd, &p->as[i]);
> + cert_as_read(b, &p->as[i]);
> +
> + io_read_str(b, &p->mft);
> + io_read_str(b, &p->notify);
> + io_read_str(b, &p->repo);
> + io_read_str(b, &p->crl);
> + io_read_str(b, &p->aia);
> + io_read_str(b, &p->aki);
> + io_read_str(b, &p->ski);
> + io_read_str(b, &p->tal);
> + io_read_str(b, &p->pubkey);
>
> - io_str_read(fd, &p->mft);
> assert(p->mft != NULL || p->purpose == CERT_PURPOSE_BGPSEC_ROUTER);
> - io_str_read(fd, &p->notify);
> - io_str_read(fd, &p->repo);
> - io_str_read(fd, &p->crl);
> - io_str_read(fd, &p->aia);
> - io_str_read(fd, &p->aki);
> - io_str_read(fd, &p->ski);
> assert(p->ski);
> - io_str_read(fd, &p->tal);
> - io_str_read(fd, &p->pubkey);
> -
> return p;
> }
>
> Index: extern.h
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/extern.h,v
> retrieving revision 1.73
> diff -u -p -r1.73 extern.h
> --- extern.h 22 Oct 2021 11:13:06 -0000 1.73
> +++ extern.h 23 Oct 2021 12:03:19 -0000
> @@ -399,25 +399,25 @@ void tal_buffer(struct ibuf *, const s
> void tal_free(struct tal *);
> struct tal *tal_parse(const char *, char *);
> char *tal_read_file(const char *);
> -struct tal *tal_read(int);
> +struct tal *tal_read(struct ibuf *);
>
> void cert_buffer(struct ibuf *, const struct cert *);
> void cert_free(struct cert *);
> struct cert *cert_parse(X509 **, const char *);
> struct cert *ta_parse(X509 **, const char *, const unsigned char *, size_t);
> -struct cert *cert_read(int);
> +struct cert *cert_read(struct ibuf *);
> void cert_insert_brks(struct brk_tree *, struct cert *);
>
> void mft_buffer(struct ibuf *, const struct mft *);
> void mft_free(struct mft *);
> struct mft *mft_parse(X509 **, const char *);
> int mft_check(const char *, struct mft *);
> -struct mft *mft_read(int);
> +struct mft *mft_read(struct ibuf *);
>
> void roa_buffer(struct ibuf *, const struct roa *);
> void roa_free(struct roa *);
> struct roa *roa_parse(X509 **, const char *);
> -struct roa *roa_read(int);
> +struct roa *roa_read(struct ibuf *);
> void roa_insert_vrps(struct vrp_tree *, struct roa *, size_t *,
> size_t *);
>
> @@ -460,8 +460,8 @@ void ip_addr_print(const struct ip_add
> void ip_addr_buffer(struct ibuf *, const struct ip_addr *);
> void ip_addr_range_buffer(struct ibuf *,
> const struct ip_addr_range *);
> -void ip_addr_read(int, struct ip_addr *);
> -void ip_addr_range_read(int, struct ip_addr_range *);
> +void ip_addr_read(struct ibuf *, struct ip_addr *);
> +void ip_addr_range_read(struct ibuf *, struct ip_addr_range *);
> int ip_addr_cmp(const struct ip_addr *, const struct ip_addr *);
> int ip_addr_check_overlap(const struct cert_ip *,
> const char *, const struct cert_ip *, size_t);
> @@ -480,7 +480,7 @@ int as_check_covered(uint32_t, uint32_
>
> /* Parser-specific */
> void entity_free(struct entity *);
> -void entity_read_req(int fd, struct entity *);
> +void entity_read_req(struct ibuf *, struct entity *);
> void entityq_flush(struct entityq *, struct repo *);
> void proc_parser(int) __attribute__((noreturn));
>
> @@ -535,8 +535,6 @@ char *hex_encode(const unsigned char *,
>
> /* Functions for moving data between processes. */
>
> -void io_socket_blocking(int);
> -void io_socket_nonblocking(int);
> struct ibuf *io_buf_new(void);
> void io_simple_buffer(struct ibuf *, const void *, size_t);
> void io_buf_buffer(struct ibuf *, const void *, size_t);
> @@ -545,7 +543,11 @@ void io_buf_close(struct msgbuf *, str
> void io_simple_read(int, void *, size_t);
> void io_buf_read_alloc(int, void **, size_t *);
> void io_str_read(int, char **);
> -int io_recvfd(int, void *, size_t);
> +void io_read_buf(struct ibuf *, void *, size_t);
> +void io_read_str(struct ibuf *, char **);
> +void io_read_buf_alloc(struct ibuf *, void **, size_t *);
> +struct ibuf *io_buf_read(int, struct ibuf **);
> +struct ibuf *io_buf_recvfd(int, struct ibuf **);
>
> /* X509 helpers. */
>
> Index: http.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/http.c,v
> retrieving revision 1.43
> diff -u -p -r1.43 http.c
> --- http.c 22 Oct 2021 11:13:06 -0000 1.43
> +++ http.c 23 Oct 2021 11:59:14 -0000
> @@ -1769,6 +1769,7 @@ proc_http(char *bind_addr, int fd)
> struct pollfd pfds[NPFDS];
> struct http_connection *conn, *nc;
> struct http_request *req, *nr;
> + struct ibuf *b, *inbuf = NULL;
>
> if (bind_addr != NULL) {
> struct addrinfo hints, *res;
> @@ -1859,18 +1860,20 @@ proc_http(char *bind_addr, int fd)
> }
> }
> if (pfds[0].revents & POLLIN) {
> - size_t id, size;
> - int outfd;
> - char *uri;
> - char *mod;
> -
> - outfd = io_recvfd(fd, &size, sizeof(size));
> - io_simple_read(fd, &id, sizeof(id));
> - io_str_read(fd, &uri);
> - io_str_read(fd, &mod);
> -
> - /* queue up new requests */
> - http_req_new(id, uri, mod, outfd);
> + b = io_buf_recvfd(fd, &inbuf);
> + if (b != NULL) {
> + size_t id;
> + char *uri;
> + char *mod;
> +
> + io_read_buf(b, &id, sizeof(id));
> + io_read_str(b, &uri);
> + io_read_str(b, &mod);
> +
> + /* queue up new requests */
> + http_req_new(id, uri, mod, b->fd);
> + ibuf_free(b);
> + }
> }
>
> now = getmonotime();
> Index: io.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/io.c,v
> retrieving revision 1.14
> diff -u -p -r1.14 io.c
> --- io.c 22 Oct 2021 11:13:06 -0000 1.14
> +++ io.c 23 Oct 2021 12:13:41 -0000
> @@ -1,4 +1,4 @@
> -/* $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */
> +/* $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */
> /*
> * Copyright (c) 2019 Kristaps Dzonsons <[email protected]>
> *
> @@ -30,28 +30,6 @@
>
> #include "extern.h"
>
> -void
> -io_socket_blocking(int fd)
> -{
> - int fl;
> -
> - if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
> - err(1, "fcntl");
> - if (fcntl(fd, F_SETFL, fl & ~O_NONBLOCK) == -1)
> - err(1, "fcntl");
> -}
> -
> -void
> -io_socket_nonblocking(int fd)
> -{
> - int fl;
> -
> - if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
> - err(1, "fcntl");
> - if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1)
> - err(1, "fcntl");
> -}
> -
> /*
> * Create new io buffer, call io_close() when done with it.
> * Function always returns a new buffer.
> @@ -109,80 +87,148 @@ io_buf_close(struct msgbuf *msgbuf, stru
> {
> size_t len;
>
> - len = ibuf_size(b);
> + len = ibuf_size(b) - sizeof(len);
> memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len));
> ibuf_close(msgbuf, b);
> }
>
> /*
> - * Read of a binary buffer that must be on a blocking descriptor.
> + * Read of an ibuf and extract sz byte from there.
> * Does nothing if "sz" is zero.
> - * This will fail and exit on EOF.
> + * Return 1 on success or 0 if there was not enough data.
> */
> void
> -io_simple_read(int fd, void *res, size_t sz)
> +io_read_buf(struct ibuf *b, void *res, size_t sz)
> {
> - ssize_t ssz;
> char *tmp;
>
> - tmp = res; /* arithmetic on a pointer to void is a GNU extension */
> -again:
> if (sz == 0)
> return;
> - if ((ssz = read(fd, tmp, sz)) == -1)
> - err(1, "read");
> - else if (ssz == 0)
> - errx(1, "read: unexpected end of file");
> - else if ((size_t)ssz == sz)
> + tmp = ibuf_seek(b, b->rpos, sz);
> + if (tmp == NULL)
> + errx(1, "bad internal framing, buffer too short");
> + b->rpos += sz;
> + memcpy(res, tmp, sz);
> +}
> +
> +/*
> + * Read a string (returns NULL for zero-length strings), allocating
> + * space for it.
> + * Return 1 on success or 0 if there was not enough data.
> + */
> +void
> +io_read_str(struct ibuf *b, char **res)
> +{
> + size_t sz;
> +
> + io_read_buf(b, &sz, sizeof(sz));
> + if (sz == 0) {
> + *res = NULL;
> return;
> - sz -= ssz;
> - tmp += ssz;
> - goto again;
> + }
> + if ((*res = calloc(sz + 1, 1)) == NULL)
> + err(1, NULL);
> + io_read_buf(b, *res, sz);
> }
>
> /*
> * Read a binary buffer, allocating space for it.
> * If the buffer is zero-sized, this won't allocate "res", but
> * will still initialise it to NULL.
> + * Return 1 on success or 0 if there was not enough data.
> */
> void
> -io_buf_read_alloc(int fd, void **res, size_t *sz)
> +io_read_buf_alloc(struct ibuf *b, void **res, size_t *sz)
> {
> -
> *res = NULL;
> - io_simple_read(fd, sz, sizeof(size_t));
> + io_read_buf(b, sz, sizeof(sz));
> if (*sz == 0)
> return;
> if ((*res = malloc(*sz)) == NULL)
> err(1, NULL);
> - io_simple_read(fd, *res, *sz);
> + io_read_buf(b, *res, *sz);
> +}
> +
> +/* XXX copy from imsg-buffer.c */
> +static int
> +ibuf_realloc(struct ibuf *buf, size_t len)
> +{
> + unsigned char *b;
> +
> + /* on static buffers max is eq size and so the following fails */
> + if (buf->wpos + len > buf->max) {
> + errno = ERANGE;
> + return (-1);
> + }
> +
> + b = recallocarray(buf->buf, buf->size, buf->wpos + len, 1);
> + if (b == NULL)
> + return (-1);
> + buf->buf = b;
> + buf->size = buf->wpos + len;
> +
> + return (0);
> }
>
> /*
> - * Read a string (returns NULL for zero-length strings), allocating
> - * space for it.
> + * Read once and fill a ibuf until it is finished.
> + * Returns NULL if more data is needed, returns a full ibuf once
> + * all data is received.
> */
> -void
> -io_str_read(int fd, char **res)
> +struct ibuf *
> +io_buf_read(int fd, struct ibuf **ib)
> {
> - size_t sz;
> + struct ibuf *b = *ib;
> + ssize_t n;
> + size_t sz;
>
> - io_simple_read(fd, &sz, sizeof(size_t));
> - if (sz == 0) {
> - *res = NULL;
> - return;
> + /* if ibuf == NULL allocate a new buffer */
> + if (b == NULL) {
> + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
> + err(1, NULL);
> + *ib = b;
> }
> - if ((*res = calloc(sz + 1, 1)) == NULL)
> - err(1, NULL);
> - io_simple_read(fd, *res, sz);
> +
> + /* read some data */
> + while ((n = read(fd, b->buf + b->wpos, b->size - b->wpos)) == -1) {
> + if (errno == EINTR)
> + continue;
> + err(1, "read");
> + }
> +
> + if (n == 0)
> + errx(1, "read: unexpected end of file");
> + b->wpos += n;
> +
> + /* got full message */
> + if (b->wpos == b->size) {
> + /* only header received */
> + if (b->wpos == sizeof(sz)) {
> + memcpy(&sz, b->buf, sizeof(sz));
> + if (sz == 0 || sz > INT32_MAX)
> + errx(1, "bad internal framing, bad size");
> + if (ibuf_realloc(b, sz) == -1)
> + err(1, "ibuf_realloc");
> + return NULL;
> + }
> +
> + /* skip over initial size header */
> + b->rpos += sizeof(sz);
> + *ib = NULL;
> + return b;
> + }
> +
> + return NULL;
> }
>
> +
> /*
> * Read data from socket but receive a file descriptor at the same time.
> */
> -int
> -io_recvfd(int fd, void *res, size_t sz)
> +struct ibuf *
> +io_buf_recvfd(int fd, struct ibuf **ib)
> {
> + struct ibuf *b = *ib;
> struct iovec iov;
> struct msghdr msg;
> struct cmsghdr *cmsg;
> @@ -190,15 +236,22 @@ io_recvfd(int fd, void *res, size_t sz)
> struct cmsghdr hdr;
> char buf[CMSG_SPACE(sizeof(int))];
> } cmsgbuf;
> - int outfd = -1;
> - char *b = res;
> ssize_t n;
> + size_t sz;
>
> + /* fd are only passed on the head, just use regular read afterwards */
> + if (b != NULL)
> + return io_buf_read(fd, ib);
> +
> + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
> + err(1, NULL);
> + *ib = b;
> +
> memset(&msg, 0, sizeof(msg));
> memset(&cmsgbuf, 0, sizeof(cmsgbuf));
>
> - iov.iov_base = res;
> - iov.iov_len = sz;
> + iov.iov_base = b->buf;
> + iov.iov_len = b->size;
>
> msg.msg_iov = &iov;
> msg.msg_iovlen = 1;
> @@ -225,29 +278,32 @@ io_recvfd(int fd, void *res, size_t sz)
> for (i = 0; i < j; i++) {
> f = ((int *)CMSG_DATA(cmsg))[i];
> if (i == 0)
> - outfd = f;
> + b->fd = f;
> else
> close(f);
> }
> }
> }
>
> - b += n;
> - sz -= n;
> - while (sz > 0) {
> - /* short receive */
> - n = recv(fd, b, sz, 0);
> - if (n == -1) {
> - if (errno == EINTR)
> - continue;
> - err(1, "recv");
> + b->wpos += n;
> +
> + /* got full message */
> + if (b->wpos == b->size) {
> + /* only header received */
> + if (b->wpos == sizeof(sz)) {
> + memcpy(&sz, b->buf, sizeof(sz));
> + if (sz == 0 || sz > INT32_MAX)
> + errx(1, "read: bad internal framing, %zu", sz);
> + if (ibuf_realloc(b, sz) == -1)
> + err(1, "ibuf_realloc");
> + return NULL;
> }
> - if (n == 0)
> - errx(1, "recv: unexpected end of file");
>
> - b += n;
> - sz -= n;
> + /* skip over initial size header */
> + b->rpos += sizeof(sz);
> + *ib = NULL;
> + return b;
> }
>
> - return outfd;
> + return NULL;
> }
> Index: ip.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/ip.c,v
> retrieving revision 1.17
> diff -u -p -r1.17 ip.c
> --- ip.c 19 Apr 2021 17:04:35 -0000 1.17
> +++ ip.c 23 Oct 2021 11:59:43 -0000
> @@ -314,14 +314,14 @@ ip_addr_range_buffer(struct ibuf *b, con
> * Matched with ip_addr_buffer().
> */
> void
> -ip_addr_read(int fd, struct ip_addr *p)
> +ip_addr_read(struct ibuf *b, struct ip_addr *p)
> {
> size_t sz;
>
> - io_simple_read(fd, &p->prefixlen, sizeof(unsigned char));
> + io_read_buf(b, &p->prefixlen, sizeof(unsigned char));
> sz = PREFIX_SIZE(p->prefixlen);
> assert(sz <= 16);
> - io_simple_read(fd, p->addr, sz);
> + io_read_buf(b, p->addr, sz);
> }
>
> /*
> @@ -329,11 +329,10 @@ ip_addr_read(int fd, struct ip_addr *p)
> * Matched with ip_addr_range_buffer().
> */
> void
> -ip_addr_range_read(int fd, struct ip_addr_range *p)
> +ip_addr_range_read(struct ibuf *b, struct ip_addr_range *p)
> {
> -
> - ip_addr_read(fd, &p->min);
> - ip_addr_read(fd, &p->max);
> + ip_addr_read(b, &p->min);
> + ip_addr_read(b, &p->max);
> }
>
> /*
> Index: main.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
> retrieving revision 1.150
> diff -u -p -r1.150 main.c
> --- main.c 22 Oct 2021 11:13:06 -0000 1.150
> +++ main.c 23 Oct 2021 12:03:01 -0000
> @@ -99,17 +99,14 @@ entity_free(struct entity *ent)
> * The pointer must be passed entity_free().
> */
> void
> -entity_read_req(int fd, struct entity *ent)
> +entity_read_req(struct ibuf *b, struct entity *ent)
> {
> - size_t size;
> -
> - io_simple_read(fd, &size, sizeof(size));
> - io_simple_read(fd, &ent->type, sizeof(enum rtype));
> - io_str_read(fd, &ent->file);
> - io_simple_read(fd, &ent->has_pkey, sizeof(int));
> + io_read_buf(b, &ent->type, sizeof(ent->type));
> + io_read_str(b, &ent->file);
> + io_read_buf(b, &ent->has_pkey, sizeof(ent->has_pkey));
> if (ent->has_pkey)
> - io_buf_read_alloc(fd, (void **)&ent->pkey, &ent->pkeysz);
> - io_str_read(fd, &ent->descr);
> + io_read_buf_alloc(b, (void **)&ent->pkey, &ent->pkeysz);
> + io_read_str(b, &ent->descr);
> }
>
> /*
> @@ -459,7 +456,7 @@ queue_add_from_cert(const struct cert *c
> * In all cases, we gather statistics.
> */
> static void
> -entity_process(int proc, struct stats *st, struct vrp_tree *tree,
> +entity_process(struct ibuf *b, struct stats *st, struct vrp_tree *tree,
> struct brk_tree *brktree)
> {
> enum rtype type;
> @@ -467,7 +464,6 @@ entity_process(int proc, struct stats *s
> struct cert *cert;
> struct mft *mft;
> struct roa *roa;
> - size_t size;
> int c;
>
> /*
> @@ -476,24 +472,23 @@ entity_process(int proc, struct stats *s
> * certificate, for example).
> * We follow that up with whether the resources didn't parse.
> */
> - io_simple_read(proc, &size, sizeof(size));
> - io_simple_read(proc, &type, sizeof(type));
> + io_read_buf(b, &type, sizeof(type));
>
> switch (type) {
> case RTYPE_TAL:
> st->tals++;
> - tal = tal_read(proc);
> + tal = tal_read(b);
> queue_add_from_tal(tal);
> tal_free(tal);
> break;
> case RTYPE_CER:
> st->certs++;
> - io_simple_read(proc, &c, sizeof(int));
> + io_read_buf(b, &c, sizeof(c));
> if (c == 0) {
> st->certs_fail++;
> break;
> }
> - cert = cert_read(proc);
> + cert = cert_read(b);
> if (cert->purpose == CERT_PURPOSE_CA) {
> if (cert->valid) {
> /*
> @@ -517,12 +512,12 @@ entity_process(int proc, struct stats *s
> break;
> case RTYPE_MFT:
> st->mfts++;
> - io_simple_read(proc, &c, sizeof(int));
> + io_read_buf(b, &c, sizeof(c));
> if (c == 0) {
> st->mfts_fail++;
> break;
> }
> - mft = mft_read(proc);
> + mft = mft_read(b);
> if (mft->stale)
> st->mfts_stale++;
> queue_add_from_mft_set(mft);
> @@ -533,12 +528,12 @@ entity_process(int proc, struct stats *s
> break;
> case RTYPE_ROA:
> st->roas++;
> - io_simple_read(proc, &c, sizeof(int));
> + io_read_buf(b, &c, sizeof(c));
> if (c == 0) {
> st->roas_fail++;
> break;
> }
> - roa = roa_read(proc);
> + roa = roa_read(b);
> if (roa->valid)
> roa_insert_vrps(tree, roa, &st->vrps, &st->uniqs);
> else
> @@ -555,6 +550,57 @@ entity_process(int proc, struct stats *s
> entity_queue--;
> }
>
> +static void
> +rrdp_process(struct ibuf *b)
> +{
> + enum rrdp_msg type;
> + enum publish_type pt;
> + struct rrdp_session s;
> + char *uri, *last_mod, *data;
> + char hash[SHA256_DIGEST_LENGTH];
> + size_t dsz, id;
> + int ok;
> +
> + io_read_buf(b, &type, sizeof(type));
> + io_read_buf(b, &id, sizeof(id));
> +
> + switch (type) {
> + case RRDP_END:
> + io_read_buf(b, &ok, sizeof(ok));
> + rrdp_finish(id, ok);
> + break;
> + case RRDP_HTTP_REQ:
> + io_read_str(b, &uri);
> + io_read_str(b, &last_mod);
> + rrdp_http_fetch(id, uri, last_mod);
> + break;
> + case RRDP_SESSION:
> + io_read_str(b, &s.session_id);
> + io_read_buf(b, &s.serial, sizeof(s.serial));
> + io_read_str(b, &s.last_mod);
> + rrdp_save_state(id, &s);
> + free(s.session_id);
> + free(s.last_mod);
> + break;
> + case RRDP_FILE:
> + io_read_buf(b, &pt, sizeof(pt));
> + if (pt != PUB_ADD)
> + io_read_buf(b, &hash, sizeof(hash));
> + io_read_str(b, &uri);
> + io_read_buf_alloc(b, (void **)&data, &dsz);
> +
> + ok = rrdp_handle_file(id, pt, uri, hash, sizeof(hash),
> + data, dsz);
> + rrdp_file_resp(id, ok);
> +
> + free(uri);
> + free(data);
> + break;
> + default:
> + errx(1, "unexpected rrdp response");
> + }
> +}
> +
> /*
> * Assign filenames ending in ".tal" in "/etc/rpki" into "tals",
> * returning the number of files found and filled-in.
> @@ -623,19 +669,21 @@ suicide(int sig __attribute__((unused)))
> int
> main(int argc, char *argv[])
> {
> - int rc, c, st, proc, rsync, http, rrdp, ok,
> - hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC;
> - size_t i, id, talsz = 0, size;
> + int rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0;
> + int fl = SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK;
> + size_t i, id, talsz = 0;
> pid_t pid, procpid, rsyncpid, httppid, rrdppid;
> int fd[2];
> struct pollfd pfd[NPFD];
> struct msgbuf *queues[NPFD];
> + struct ibuf *b, *httpbuf = NULL, *procbuf = NULL;
> + struct ibuf *rrdpbuf = NULL, *rsyncbuf = NULL;
> char *rsync_prog = "openrsync";
> char *bind_addr = NULL;
> const char *cachedir = NULL, *outputdir = NULL;
> const char *tals[TALSZ_MAX], *errs, *name;
> - struct vrp_tree v = RB_INITIALIZER(&v);
> - struct brk_tree b = RB_INITIALIZER(&b);
> + struct vrp_tree vrps = RB_INITIALIZER(&vrps);
> + struct brk_tree brks = RB_INITIALIZER(&brks);
> struct rusage ru;
> struct timeval start_time, now_time;
>
> @@ -972,12 +1020,6 @@ main(int argc, char *argv[])
> hangup = 1;
> }
> if (pfd[i].revents & POLLOUT) {
> - /*
> - * XXX work around deadlocks because of
> - * blocking read vs non-blocking writes.
> - */
> - if (i > 1)
> - io_socket_nonblocking(pfd[i].fd);
> switch (msgbuf_write(queues[i])) {
> case 0:
> errx(1, "write[%zu]: "
> @@ -985,8 +1027,6 @@ main(int argc, char *argv[])
> case -1:
> err(1, "write[%zu]", i);
> }
> - if (i > 1)
> - io_socket_blocking(pfd[i].fd);
> }
> }
> if (hangup)
> @@ -1000,75 +1040,38 @@ main(int argc, char *argv[])
> */
>
> if ((pfd[1].revents & POLLIN)) {
> - io_simple_read(rsync, &size, sizeof(size));
> - io_simple_read(rsync, &id, sizeof(id));
> - io_simple_read(rsync, &ok, sizeof(ok));
> - rsync_finish(id, ok);
> + b = io_buf_read(rsync, &rsyncbuf);
> + if (b != NULL) {
> + io_read_buf(b, &id, sizeof(id));
> + io_read_buf(b, &ok, sizeof(ok));
> + rsync_finish(id, ok);
> + ibuf_free(b);
> + }
> }
>
> if ((pfd[2].revents & POLLIN)) {
> - enum http_result res;
> - char *last_mod;
> -
> - io_simple_read(http, &size, sizeof(size));
> - io_simple_read(http, &id, sizeof(id));
> - io_simple_read(http, &res, sizeof(res));
> - io_str_read(http, &last_mod);
> - http_finish(id, res, last_mod);
> - free(last_mod);
> + b = io_buf_read(http, &httpbuf);
> + if (b != NULL) {
> + enum http_result res;
> + char *last_mod;
> +
> + io_read_buf(b, &id, sizeof(id));
> + io_read_buf(b, &res, sizeof(res));
> + io_read_str(b, &last_mod);
> + http_finish(id, res, last_mod);
> + free(last_mod);
> + ibuf_free(b);
> + }
> }
>
> /*
> * Handle RRDP requests here.
> */
> if ((pfd[3].revents & POLLIN)) {
> - enum rrdp_msg type;
> - enum publish_type pt;
> - struct rrdp_session s;
> - char *uri, *last_mod, *data;
> - char hash[SHA256_DIGEST_LENGTH];
> - size_t dsz;
> -
> - io_simple_read(rrdp, &size, sizeof(size));
> - io_simple_read(rrdp, &type, sizeof(type));
> - io_simple_read(rrdp, &id, sizeof(id));
> -
> - switch (type) {
> - case RRDP_END:
> - io_simple_read(rrdp, &ok, sizeof(ok));
> - rrdp_finish(id, ok);
> - break;
> - case RRDP_HTTP_REQ:
> - io_str_read(rrdp, &uri);
> - io_str_read(rrdp, &last_mod);
> - rrdp_http_fetch(id, uri, last_mod);
> - break;
> - case RRDP_SESSION:
> - io_str_read(rrdp, &s.session_id);
> - io_simple_read(rrdp, &s.serial,
> - sizeof(s.serial));
> - io_str_read(rrdp, &s.last_mod);
> - rrdp_save_state(id, &s);
> - free(s.session_id);
> - free(s.last_mod);
> - break;
> - case RRDP_FILE:
> - io_simple_read(rrdp, &pt, sizeof(pt));
> - if (pt != PUB_ADD)
> - io_simple_read(rrdp, &hash,
> - sizeof(hash));
> - io_str_read(rrdp, &uri);
> - io_buf_read_alloc(rrdp, (void **)&data, &dsz);
> -
> - ok = rrdp_handle_file(id, pt, uri,
> - hash, sizeof(hash), data, dsz);
> - rrdp_file_resp(id, ok);
> -
> - free(uri);
> - free(data);
> - break;
> - default:
> - errx(1, "unexpected rrdp response");
> + b = io_buf_read(rrdp, &rrdpbuf);
> + if (b != NULL) {
> + rrdp_process(b);
> + ibuf_free(b);
> }
> }
>
> @@ -1078,7 +1081,11 @@ main(int argc, char *argv[])
> */
>
> if ((pfd[0].revents & POLLIN)) {
> - entity_process(proc, &stats, &v, &b);
> + b = io_buf_read(proc, &procbuf);
> + if (b != NULL) {
> + entity_process(b, &stats, &vrps, &brks);
> + ibuf_free(b);
> + }
> }
> }
>
> @@ -1154,7 +1161,7 @@ main(int argc, char *argv[])
> if (fchdir(outdirfd) == -1)
> err(1, "fchdir output dir");
>
> - if (outputfiles(&v, &b, &stats))
> + if (outputfiles(&vrps, &brks, &stats))
> rc = 1;
>
>
> Index: mft.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/mft.c,v
> retrieving revision 1.38
> diff -u -p -r1.38 mft.c
> --- mft.c 9 Sep 2021 14:15:49 -0000 1.38
> +++ mft.c 23 Oct 2021 12:04:20 -0000
> @@ -564,7 +564,7 @@ mft_buffer(struct ibuf *b, const struct
> * Result must be passed to mft_free().
> */
> struct mft *
> -mft_read(int fd)
> +mft_read(struct ibuf *b)
> {
> struct mft *p = NULL;
> size_t i;
> @@ -572,22 +572,22 @@ mft_read(int fd)
> if ((p = calloc(1, sizeof(struct mft))) == NULL)
> err(1, NULL);
>
> - io_simple_read(fd, &p->stale, sizeof(int));
> - io_str_read(fd, &p->file);
> - assert(p->file);
> - io_simple_read(fd, &p->filesz, sizeof(size_t));
> + io_read_buf(b, &p->stale, sizeof(int));
> + io_read_str(b, &p->file);
> + io_read_buf(b, &p->filesz, sizeof(size_t));
>
> + assert(p->file);
> if ((p->files = calloc(p->filesz, sizeof(struct mftfile))) == NULL)
> err(1, NULL);
>
> for (i = 0; i < p->filesz; i++) {
> - io_str_read(fd, &p->files[i].file);
> - io_simple_read(fd, p->files[i].hash, SHA256_DIGEST_LENGTH);
> + io_read_str(b, &p->files[i].file);
> + io_read_buf(b, p->files[i].hash, SHA256_DIGEST_LENGTH);
> }
>
> - io_str_read(fd, &p->aia);
> - io_str_read(fd, &p->aki);
> - io_str_read(fd, &p->ski);
> + io_read_str(b, &p->aia);
> + io_read_str(b, &p->aki);
> + io_read_str(b, &p->ski);
> assert(p->aia && p->aki && p->ski);
>
> return p;
> Index: parser.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/parser.c,v
> retrieving revision 1.14
> diff -u -p -r1.14 parser.c
> --- parser.c 22 Oct 2021 11:13:06 -0000 1.14
> +++ parser.c 23 Oct 2021 12:04:54 -0000
> @@ -525,7 +525,7 @@ proc_parser(int fd)
> struct entityq q;
> struct msgbuf msgq;
> struct pollfd pfd;
> - struct ibuf *b;
> + struct ibuf *b, *inbuf = NULL;
> X509_STORE_CTX *ctx;
> struct auth_tree auths = RB_INITIALIZER(&auths);
> struct crl_tree crlt = RB_INITIALIZER(&crlt);
> @@ -545,8 +545,6 @@ proc_parser(int fd)
>
> pfd.fd = fd;
>
> - io_socket_nonblocking(pfd.fd);
> -
> for (;;) {
> pfd.events = POLLIN;
> if (msgq.queued)
> @@ -571,13 +569,16 @@ proc_parser(int fd)
> */
>
> if ((pfd.revents & POLLIN)) {
> - io_socket_blocking(fd);
> - entp = calloc(1, sizeof(struct entity));
> - if (entp == NULL)
> - err(1, NULL);
> - entity_read_req(fd, entp);
> - TAILQ_INSERT_TAIL(&q, entp, entries);
> - io_socket_nonblocking(fd);
> + b = io_buf_read(fd, &inbuf);
> +
> + if (b != NULL) {
> + entp = calloc(1, sizeof(struct entity));
> + if (entp == NULL)
> + err(1, NULL);
> + entity_read_req(b, entp);
> + TAILQ_INSERT_TAIL(&q, entp, entries);
> + ibuf_free(b);
> + }
> }
>
> if (pfd.revents & POLLOUT) {
> Index: roa.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/roa.c,v
> retrieving revision 1.26
> diff -u -p -r1.26 roa.c
> --- roa.c 7 Oct 2021 08:28:45 -0000 1.26
> +++ roa.c 23 Oct 2021 12:05:47 -0000
> @@ -448,7 +448,7 @@ roa_buffer(struct ibuf *b, const struct
> * Result must be passed to roa_free().
> */
> struct roa *
> -roa_read(int fd)
> +roa_read(struct ibuf *b)
> {
> struct roa *p;
> size_t i;
> @@ -456,26 +456,26 @@ roa_read(int fd)
> if ((p = calloc(1, sizeof(struct roa))) == NULL)
> err(1, NULL);
>
> - io_simple_read(fd, &p->valid, sizeof(int));
> - io_simple_read(fd, &p->asid, sizeof(uint32_t));
> - io_simple_read(fd, &p->ipsz, sizeof(size_t));
> - io_simple_read(fd, &p->expires, sizeof(time_t));
> + io_read_buf(b, &p->valid, sizeof(int));
> + io_read_buf(b, &p->asid, sizeof(uint32_t));
> + io_read_buf(b, &p->ipsz, sizeof(size_t));
> + io_read_buf(b, &p->expires, sizeof(time_t));
>
> if ((p->ips = calloc(p->ipsz, sizeof(struct roa_ip))) == NULL)
> err(1, NULL);
>
> for (i = 0; i < p->ipsz; i++) {
> - io_simple_read(fd, &p->ips[i].afi, sizeof(enum afi));
> - io_simple_read(fd, &p->ips[i].maxlength, sizeof(size_t));
> - io_simple_read(fd, &p->ips[i].min, sizeof(p->ips[i].min));
> - io_simple_read(fd, &p->ips[i].max, sizeof(p->ips[i].max));
> - ip_addr_read(fd, &p->ips[i].addr);
> + io_read_buf(b, &p->ips[i].afi, sizeof(enum afi));
> + io_read_buf(b, &p->ips[i].maxlength, sizeof(size_t));
> + io_read_buf(b, &p->ips[i].min, sizeof(p->ips[i].min));
> + io_read_buf(b, &p->ips[i].max, sizeof(p->ips[i].max));
> + ip_addr_read(b, &p->ips[i].addr);
> }
>
> - io_str_read(fd, &p->aia);
> - io_str_read(fd, &p->aki);
> - io_str_read(fd, &p->ski);
> - io_str_read(fd, &p->tal);
> + io_read_str(b, &p->aia);
> + io_read_str(b, &p->aki);
> + io_read_str(b, &p->ski);
> + io_read_str(b, &p->tal);
> assert(p->aia && p->aki && p->ski && p->tal);
>
> return p;
> Index: rrdp.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/rrdp.c,v
> retrieving revision 1.12
> diff -u -p -r1.12 rrdp.c
> --- rrdp.c 22 Oct 2021 11:13:06 -0000 1.12
> +++ rrdp.c 23 Oct 2021 12:06:53 -0000
> @@ -378,32 +378,37 @@ rrdp_finished(struct rrdp *s)
> static void
> rrdp_input_handler(int fd)
> {
> + static struct ibuf *inbuf;
> char *local, *notify, *session_id, *last_mod;
> + struct ibuf *b;
> struct rrdp *s;
> enum rrdp_msg type;
> enum http_result res;
> long long serial;
> - size_t id, size;
> - int infd, ok;
> + size_t id;
> + int ok;
>
> - infd = io_recvfd(fd, &size, sizeof(size));
> - io_simple_read(fd, &type, sizeof(type));
> - io_simple_read(fd, &id, sizeof(id));
> + b = io_buf_recvfd(fd, &inbuf);
> + if (b == NULL)
> + return;
> +
> + io_read_buf(b, &type, sizeof(type));
> + io_read_buf(b, &id, sizeof(id));
>
> switch (type) {
> case RRDP_START:
> - io_str_read(fd, &local);
> - io_str_read(fd, ¬ify);
> - io_str_read(fd, &session_id);
> - io_simple_read(fd, &serial, sizeof(serial));
> - io_str_read(fd, &last_mod);
> - if (infd != -1)
> - errx(1, "received unexpected fd %d", infd);
> + io_read_str(b, &local);
> + io_read_str(b, ¬ify);
> + io_read_str(b, &session_id);
> + io_read_buf(b, &serial, sizeof(serial));
> + io_read_str(b, &last_mod);
> + if (b->fd != -1)
> + errx(1, "received unexpected fd");
>
> s = rrdp_new(id, local, notify, session_id, serial, last_mod);
> break;
> case RRDP_HTTP_INI:
> - if (infd == -1)
> + if (b->fd == -1)
> errx(1, "expected fd not received");
> s = rrdp_get(id);
> if (s == NULL)
> @@ -411,13 +416,13 @@ rrdp_input_handler(int fd)
> if (s->state != RRDP_STATE_WAIT)
> errx(1, "%s: bad internal state", s->local);
>
> - s->infd = infd;
> + s->infd = b->fd;
> s->state = RRDP_STATE_PARSE;
> break;
> case RRDP_HTTP_FIN:
> - io_simple_read(fd, &res, sizeof(res));
> - io_str_read(fd, &last_mod);
> - if (infd != -1)
> + io_read_buf(b, &res, sizeof(res));
> + io_read_str(b, &last_mod);
> + if (b->fd != -1)
> errx(1, "received unexpected fd");
>
> s = rrdp_get(id);
> @@ -435,12 +440,11 @@ rrdp_input_handler(int fd)
> s = rrdp_get(id);
> if (s == NULL)
> errx(1, "rrdp session %zu does not exist", id);
> - if (infd != -1)
> - errx(1, "received unexpected fd %d", infd);
> - io_simple_read(fd, &ok, sizeof(ok));
> - if (ok != 1) {
> + if (b->fd != -1)
> + errx(1, "received unexpected fd");
> + io_read_buf(b, &ok, sizeof(ok));
> + if (ok != 1)
> s->file_failed++;
> - }
> s->file_pending--;
> if (s->file_pending == 0)
> rrdp_finished(s);
> @@ -448,6 +452,7 @@ rrdp_input_handler(int fd)
> default:
> errx(1, "unexpected message %d", type);
> }
> + ibuf_free(b);
> }
>
> static void
> @@ -558,14 +563,12 @@ proc_rrdp(int fd)
> if (pfds[0].revents & POLLHUP)
> break;
> if (pfds[0].revents & POLLOUT) {
> - io_socket_nonblocking(fd);
> switch (msgbuf_write(&msgq)) {
> case 0:
> errx(1, "write: connection closed");
> case -1:
> err(1, "write");
> }
> - io_socket_blocking(fd);
> }
> if (pfds[0].revents & POLLIN)
> rrdp_input_handler(fd);
> Index: rsync.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/rsync.c,v
> retrieving revision 1.26
> diff -u -p -r1.26 rsync.c
> --- rsync.c 22 Oct 2021 11:13:06 -0000 1.26
> +++ rsync.c 23 Oct 2021 12:07:41 -0000
> @@ -120,6 +120,7 @@ proc_rsync(char *prog, char *bind_addr,
> int rc = 0;
> struct pollfd pfd;
> struct msgbuf msgq;
> + struct ibuf *b, *inbuf = NULL;
> sigset_t mask, oldmask;
> struct rsyncproc *ids = NULL;
>
> @@ -178,7 +179,7 @@ proc_rsync(char *prog, char *bind_addr,
>
> for (;;) {
> char *uri = NULL, *dst = NULL;
> - size_t id, size;
> + size_t id;
> pid_t pid;
> int st;
>
> @@ -198,7 +199,6 @@ proc_rsync(char *prog, char *bind_addr,
> */
>
> while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) {
> - struct ibuf *b;
> int ok = 1;
>
> for (i = 0; i < idsz; i++)
> @@ -247,11 +247,17 @@ proc_rsync(char *prog, char *bind_addr,
> if (!(pfd.revents & POLLIN))
> continue;
>
> + b = io_buf_read(fd, &inbuf);
> + if (b == NULL)
> + continue;
> +
> /* Read host and module. */
> - io_simple_read(fd, &size, sizeof(size));
> - io_simple_read(fd, &id, sizeof(id));
> - io_str_read(fd, &dst);
> - io_str_read(fd, &uri);
> + io_read_buf(b, &id, sizeof(id));
> + io_read_str(b, &dst);
> + io_read_str(b, &uri);
> +
> + ibuf_free(b);
> +
> assert(dst);
> assert(uri);
>
> Index: tal.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/tal.c,v
> retrieving revision 1.30
> diff -u -p -r1.30 tal.c
> --- tal.c 1 Apr 2021 06:43:23 -0000 1.30
> +++ tal.c 23 Oct 2021 12:07:17 -0000
> @@ -284,7 +284,7 @@ tal_buffer(struct ibuf *b, const struct
> * A returned pointer must be freed with tal_free().
> */
> struct tal *
> -tal_read(int fd)
> +tal_read(struct ibuf *b)
> {
> size_t i;
> struct tal *p;
> @@ -292,18 +292,18 @@ tal_read(int fd)
> if ((p = calloc(1, sizeof(struct tal))) == NULL)
> err(1, NULL);
>
> - io_buf_read_alloc(fd, (void **)&p->pkey, &p->pkeysz);
> + io_read_buf_alloc(b, (void **)&p->pkey, &p->pkeysz);
> + io_read_str(b, &p->descr);
> + io_read_buf(b, &p->urisz, sizeof(size_t));
> assert(p->pkeysz > 0);
> - io_str_read(fd, &p->descr);
> assert(p->descr);
> - io_simple_read(fd, &p->urisz, sizeof(size_t));
> assert(p->urisz > 0);
>
> if ((p->uri = calloc(p->urisz, sizeof(char *))) == NULL)
> err(1, NULL);
>
> for (i = 0; i < p->urisz; i++) {
> - io_str_read(fd, &p->uri[i]);
> + io_read_str(b, &p->uri[i]);
> assert(p->uri[i]);
> }
>
>