ok benno@

Claudio Jeker([email protected]) on 2021.10.23 14:20:19 +0200:
> This diff changes the io read functions to work on ibufs.
> With this the poll loops will consume data with io_buf_read() until a full
> message is received and then that message is processed. Thanks to this
> the processes no longer block while waiting for more data in the io read
> functions.
> 
> With this also the blocking/nonblocking dance done in a few places is no
> longer needed. Everything is now nonblocking.
> 
> Further cleanup of the various marshaling functions can follow in a later
> step.
> -- 
> :wq Claudio
> 
> Index: cert.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/cert.c,v
> retrieving revision 1.39
> diff -u -p -r1.39 cert.c
> --- cert.c    15 Oct 2021 22:30:33 -0000      1.39
> +++ cert.c    23 Oct 2021 11:58:43 -0000
> @@ -1281,33 +1281,31 @@ cert_buffer(struct ibuf *b, const struct
>  }
>  
>  static void
> -cert_ip_read(int fd, struct cert_ip *p)
> +cert_ip_read(struct ibuf *b, struct cert_ip *p)
>  {
> -
> -     io_simple_read(fd, &p->afi, sizeof(enum afi));
> -     io_simple_read(fd, &p->type, sizeof(enum cert_ip_type));
> +     io_read_buf(b, &p->afi, sizeof(enum afi));
> +     io_read_buf(b, &p->type, sizeof(enum cert_ip_type));
>  
>       if (p->type != CERT_IP_INHERIT) {
> -             io_simple_read(fd, &p->min, sizeof(p->min));
> -             io_simple_read(fd, &p->max, sizeof(p->max));
> +             io_read_buf(b, &p->min, sizeof(p->min));
> +             io_read_buf(b, &p->max, sizeof(p->max));
>       }
>  
>       if (p->type == CERT_IP_RANGE)
> -             ip_addr_range_read(fd, &p->range);
> +             ip_addr_range_read(b, &p->range);
>       else if (p->type == CERT_IP_ADDR)
> -             ip_addr_read(fd, &p->ip);
> +             ip_addr_read(b, &p->ip);
>  }
>  
>  static void
> -cert_as_read(int fd, struct cert_as *p)
> +cert_as_read(struct ibuf *b, struct cert_as *p)
>  {
> -
> -     io_simple_read(fd, &p->type, sizeof(enum cert_as_type));
> +     io_read_buf(b, &p->type, sizeof(enum cert_as_type));
>       if (p->type == CERT_AS_RANGE) {
> -             io_simple_read(fd, &p->range.min, sizeof(uint32_t));
> -             io_simple_read(fd, &p->range.max, sizeof(uint32_t));
> +             io_read_buf(b, &p->range.min, sizeof(uint32_t));
> +             io_read_buf(b, &p->range.max, sizeof(uint32_t));
>       } else if (p->type == CERT_AS_ID)
> -             io_simple_read(fd, &p->id, sizeof(uint32_t));
> +             io_read_buf(b, &p->id, sizeof(uint32_t));
>  }
>  
>  /*
> @@ -1316,7 +1314,7 @@ cert_as_read(int fd, struct cert_as *p)
>   * Always returns a valid pointer.
>   */
>  struct cert *
> -cert_read(int fd)
> +cert_read(struct ibuf *b)
>  {
>       struct cert     *p;
>       size_t           i;
> @@ -1324,35 +1322,36 @@ cert_read(int fd)
>       if ((p = calloc(1, sizeof(struct cert))) == NULL)
>               err(1, NULL);
>  
> -     io_simple_read(fd, &p->valid, sizeof(int));
> -     io_simple_read(fd, &p->expires, sizeof(time_t));
> -     io_simple_read(fd, &p->purpose, sizeof(enum cert_purpose));
> -     io_simple_read(fd, &p->ipsz, sizeof(size_t));
> +     io_read_buf(b, &p->valid, sizeof(int));
> +     io_read_buf(b, &p->expires, sizeof(time_t));
> +     io_read_buf(b, &p->purpose, sizeof(enum cert_purpose));
> +     io_read_buf(b, &p->ipsz, sizeof(size_t));
> +
>       p->ips = calloc(p->ipsz, sizeof(struct cert_ip));
>       if (p->ips == NULL)
>               err(1, NULL);
>       for (i = 0; i < p->ipsz; i++)
> -             cert_ip_read(fd, &p->ips[i]);
> +             cert_ip_read(b, &p->ips[i]);
>  
> -     io_simple_read(fd, &p->asz, sizeof(size_t));
> +     io_read_buf(b, &p->asz, sizeof(size_t));
>       p->as = calloc(p->asz, sizeof(struct cert_as));
>       if (p->as == NULL)
>               err(1, NULL);
>       for (i = 0; i < p->asz; i++)
> -             cert_as_read(fd, &p->as[i]);
> +             cert_as_read(b, &p->as[i]);
> +
> +     io_read_str(b, &p->mft);
> +     io_read_str(b, &p->notify);
> +     io_read_str(b, &p->repo);
> +     io_read_str(b, &p->crl);
> +     io_read_str(b, &p->aia);
> +     io_read_str(b, &p->aki);
> +     io_read_str(b, &p->ski);
> +     io_read_str(b, &p->tal);
> +     io_read_str(b, &p->pubkey);
>  
> -     io_str_read(fd, &p->mft);
>       assert(p->mft != NULL || p->purpose == CERT_PURPOSE_BGPSEC_ROUTER);
> -     io_str_read(fd, &p->notify);
> -     io_str_read(fd, &p->repo);
> -     io_str_read(fd, &p->crl);
> -     io_str_read(fd, &p->aia);
> -     io_str_read(fd, &p->aki);
> -     io_str_read(fd, &p->ski);
>       assert(p->ski);
> -     io_str_read(fd, &p->tal);
> -     io_str_read(fd, &p->pubkey);
> -
>       return p;
>  }
>  
> Index: extern.h
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/extern.h,v
> retrieving revision 1.73
> diff -u -p -r1.73 extern.h
> --- extern.h  22 Oct 2021 11:13:06 -0000      1.73
> +++ extern.h  23 Oct 2021 12:03:19 -0000
> @@ -399,25 +399,25 @@ void             tal_buffer(struct ibuf *, const s
>  void          tal_free(struct tal *);
>  struct tal   *tal_parse(const char *, char *);
>  char         *tal_read_file(const char *);
> -struct tal   *tal_read(int);
> +struct tal   *tal_read(struct ibuf *);
>  
>  void          cert_buffer(struct ibuf *, const struct cert *);
>  void          cert_free(struct cert *);
>  struct cert  *cert_parse(X509 **, const char *);
>  struct cert  *ta_parse(X509 **, const char *, const unsigned char *, size_t);
> -struct cert  *cert_read(int);
> +struct cert  *cert_read(struct ibuf *);
>  void          cert_insert_brks(struct brk_tree *, struct cert *);
>  
>  void          mft_buffer(struct ibuf *, const struct mft *);
>  void          mft_free(struct mft *);
>  struct mft   *mft_parse(X509 **, const char *);
>  int           mft_check(const char *, struct mft *);
> -struct mft   *mft_read(int);
> +struct mft   *mft_read(struct ibuf *);
>  
>  void          roa_buffer(struct ibuf *, const struct roa *);
>  void          roa_free(struct roa *);
>  struct roa   *roa_parse(X509 **, const char *);
> -struct roa   *roa_read(int);
> +struct roa   *roa_read(struct ibuf *);
>  void          roa_insert_vrps(struct vrp_tree *, struct roa *, size_t *,
>                   size_t *);
>  
> @@ -460,8 +460,8 @@ void               ip_addr_print(const struct ip_add
>  void          ip_addr_buffer(struct ibuf *, const struct ip_addr *);
>  void          ip_addr_range_buffer(struct ibuf *,
>                       const struct ip_addr_range *);
> -void          ip_addr_read(int, struct ip_addr *);
> -void          ip_addr_range_read(int, struct ip_addr_range *);
> +void          ip_addr_read(struct ibuf *, struct ip_addr *);
> +void          ip_addr_range_read(struct ibuf *, struct ip_addr_range *);
>  int           ip_addr_cmp(const struct ip_addr *, const struct ip_addr *);
>  int           ip_addr_check_overlap(const struct cert_ip *,
>                       const char *, const struct cert_ip *, size_t);
> @@ -480,7 +480,7 @@ int                as_check_covered(uint32_t, uint32_
>  
>  /* Parser-specific */
>  void          entity_free(struct entity *);
> -void          entity_read_req(int fd, struct entity *);
> +void          entity_read_req(struct ibuf *, struct entity *);
>  void          entityq_flush(struct entityq *, struct repo *);
>  void          proc_parser(int) __attribute__((noreturn));
>  
> @@ -535,8 +535,6 @@ char              *hex_encode(const unsigned char *,
>  
>  /* Functions for moving data between processes. */
>  
> -void          io_socket_blocking(int);
> -void          io_socket_nonblocking(int);
>  struct ibuf  *io_buf_new(void);
>  void          io_simple_buffer(struct ibuf *, const void *, size_t);
>  void          io_buf_buffer(struct ibuf *, const void *, size_t);
> @@ -545,7 +543,11 @@ void              io_buf_close(struct msgbuf *, str
>  void          io_simple_read(int, void *, size_t);
>  void          io_buf_read_alloc(int, void **, size_t *);
>  void          io_str_read(int, char **);
> -int           io_recvfd(int, void *, size_t);
> +void          io_read_buf(struct ibuf *, void *, size_t);
> +void          io_read_str(struct ibuf *, char **);
> +void          io_read_buf_alloc(struct ibuf *, void **, size_t *);
> +struct ibuf  *io_buf_read(int, struct ibuf **);
> +struct ibuf  *io_buf_recvfd(int, struct ibuf **);
>  
>  /* X509 helpers. */
>  
> Index: http.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/http.c,v
> retrieving revision 1.43
> diff -u -p -r1.43 http.c
> --- http.c    22 Oct 2021 11:13:06 -0000      1.43
> +++ http.c    23 Oct 2021 11:59:14 -0000
> @@ -1769,6 +1769,7 @@ proc_http(char *bind_addr, int fd)
>       struct pollfd pfds[NPFDS];
>       struct http_connection *conn, *nc;
>       struct http_request *req, *nr;
> +     struct ibuf *b, *inbuf = NULL;
>  
>       if (bind_addr != NULL) {
>               struct addrinfo hints, *res;
> @@ -1859,18 +1860,20 @@ proc_http(char *bind_addr, int fd)
>                       }
>               }
>               if (pfds[0].revents & POLLIN) {
> -                     size_t id, size;
> -                     int outfd;
> -                     char *uri;
> -                     char *mod;
> -
> -                     outfd = io_recvfd(fd, &size, sizeof(size));
> -                     io_simple_read(fd, &id, sizeof(id));
> -                     io_str_read(fd, &uri);
> -                     io_str_read(fd, &mod);
> -
> -                     /* queue up new requests */
> -                     http_req_new(id, uri, mod, outfd);
> +                     b = io_buf_recvfd(fd, &inbuf);
> +                     if (b != NULL) {
> +                             size_t id;
> +                             char *uri;
> +                             char *mod;
> +
> +                             io_read_buf(b, &id, sizeof(id));
> +                             io_read_str(b, &uri);
> +                             io_read_str(b, &mod);
> +
> +                             /* queue up new requests */
> +                             http_req_new(id, uri, mod, b->fd);
> +                             ibuf_free(b);
> +                     }
>               }
>  
>               now = getmonotime();
> Index: io.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/io.c,v
> retrieving revision 1.14
> diff -u -p -r1.14 io.c
> --- io.c      22 Oct 2021 11:13:06 -0000      1.14
> +++ io.c      23 Oct 2021 12:13:41 -0000
> @@ -1,4 +1,4 @@
> -/*   $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */
> +/*   $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */
>  /*
>   * Copyright (c) 2019 Kristaps Dzonsons <[email protected]>
>   *
> @@ -30,28 +30,6 @@
>  
>  #include "extern.h"
>  
> -void
> -io_socket_blocking(int fd)
> -{
> -     int      fl;
> -
> -     if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
> -             err(1, "fcntl");
> -     if (fcntl(fd, F_SETFL, fl & ~O_NONBLOCK) == -1)
> -             err(1, "fcntl");
> -}
> -
> -void
> -io_socket_nonblocking(int fd)
> -{
> -     int      fl;
> -
> -     if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
> -             err(1, "fcntl");
> -     if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1)
> -             err(1, "fcntl");
> -}
> -
>  /*
>   * Create new io buffer, call io_close() when done with it.
>   * Function always returns a new buffer.
> @@ -109,80 +87,148 @@ io_buf_close(struct msgbuf *msgbuf, stru
>  {
>       size_t len;
>  
> -     len = ibuf_size(b);
> +     len = ibuf_size(b) - sizeof(len);
>       memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len));
>       ibuf_close(msgbuf, b);
>  }
>  
>  /*
> - * Read of a binary buffer that must be on a blocking descriptor.
> + * Read of an ibuf and extract sz byte from there.
>   * Does nothing if "sz" is zero.
> - * This will fail and exit on EOF.
> + * Return 1 on success or 0 if there was not enough data.
>   */
>  void
> -io_simple_read(int fd, void *res, size_t sz)
> +io_read_buf(struct ibuf *b, void *res, size_t sz)
>  {
> -     ssize_t  ssz;
>       char    *tmp;
>  
> -     tmp = res; /* arithmetic on a pointer to void is a GNU extension */
> -again:
>       if (sz == 0)
>               return;
> -     if ((ssz = read(fd, tmp, sz)) == -1)
> -             err(1, "read");
> -     else if (ssz == 0)
> -             errx(1, "read: unexpected end of file");
> -     else if ((size_t)ssz == sz)
> +     tmp = ibuf_seek(b, b->rpos, sz);
> +     if (tmp == NULL)
> +             errx(1, "bad internal framing, buffer too short");
> +     b->rpos += sz;
> +     memcpy(res, tmp, sz);
> +}
> +
> +/*
> + * Read a string (returns NULL for zero-length strings), allocating
> + * space for it.
> + * Return 1 on success or 0 if there was not enough data.
> + */
> +void
> +io_read_str(struct ibuf *b, char **res)
> +{
> +     size_t   sz;
> +
> +     io_read_buf(b, &sz, sizeof(sz));
> +     if (sz == 0) {
> +             *res = NULL;
>               return;
> -     sz -= ssz;
> -     tmp += ssz;
> -     goto again;
> +     }
> +     if ((*res = calloc(sz + 1, 1)) == NULL)
> +             err(1, NULL);
> +     io_read_buf(b, *res, sz);
>  }
>  
>  /*
>   * Read a binary buffer, allocating space for it.
>   * If the buffer is zero-sized, this won't allocate "res", but
>   * will still initialise it to NULL.
> + * Return 1 on success or 0 if there was not enough data.
>   */
>  void
> -io_buf_read_alloc(int fd, void **res, size_t *sz)
> +io_read_buf_alloc(struct ibuf *b, void **res, size_t *sz)
>  {
> -
>       *res = NULL;
> -     io_simple_read(fd, sz, sizeof(size_t));
> +     io_read_buf(b, sz, sizeof(sz));
>       if (*sz == 0)
>               return;
>       if ((*res = malloc(*sz)) == NULL)
>               err(1, NULL);
> -     io_simple_read(fd, *res, *sz);
> +     io_read_buf(b, *res, *sz);
> +}
> +
> +/* XXX copy from imsg-buffer.c */
> +static int
> +ibuf_realloc(struct ibuf *buf, size_t len)
> +{
> +     unsigned char   *b;
> +
> +     /* on static buffers max is eq size and so the following fails */
> +     if (buf->wpos + len > buf->max) {
> +             errno = ERANGE;
> +             return (-1);
> +     }
> +
> +     b = recallocarray(buf->buf, buf->size, buf->wpos + len, 1);
> +     if (b == NULL)
> +             return (-1);
> +     buf->buf = b;
> +     buf->size = buf->wpos + len;
> +
> +     return (0);
>  }
>  
>  /*
> - * Read a string (returns NULL for zero-length strings), allocating
> - * space for it.
> + * Read once and fill a ibuf until it is finished.
> + * Returns NULL if more data is needed, returns a full ibuf once
> + * all data is received.
>   */
> -void
> -io_str_read(int fd, char **res)
> +struct ibuf *
> +io_buf_read(int fd, struct ibuf **ib)
>  {
> -     size_t   sz;
> +     struct ibuf *b = *ib;
> +     ssize_t n;
> +     size_t sz;
>  
> -     io_simple_read(fd, &sz, sizeof(size_t));
> -     if (sz == 0) {
> -             *res = NULL;
> -             return;
> +     /* if ibuf == NULL allocate a new buffer */
> +     if (b == NULL) {
> +             if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
> +                     err(1, NULL);
> +             *ib = b;
>       }
> -     if ((*res = calloc(sz + 1, 1)) == NULL)
> -             err(1, NULL);
> -     io_simple_read(fd, *res, sz);
> +
> +     /* read some data */
> +     while ((n = read(fd, b->buf + b->wpos, b->size - b->wpos)) == -1) {
> +             if (errno == EINTR)
> +                     continue;
> +             err(1, "read");
> +     }
> +
> +     if (n == 0)
> +             errx(1, "read: unexpected end of file");
> +     b->wpos += n;
> +
> +     /* got full message */
> +     if (b->wpos == b->size) {
> +             /* only header received */
> +             if (b->wpos == sizeof(sz)) {
> +                     memcpy(&sz, b->buf, sizeof(sz));
> +                     if (sz == 0 || sz > INT32_MAX)
> +                             errx(1, "bad internal framing, bad size");
> +                     if (ibuf_realloc(b, sz) == -1)
> +                             err(1, "ibuf_realloc");
> +                     return NULL;
> +             }
> +
> +             /* skip over initial size header */
> +             b->rpos += sizeof(sz);
> +             *ib = NULL;
> +             return b;
> +     }
> +
> +     return NULL;
>  }
>  
> +
>  /*
>   * Read data from socket but receive a file descriptor at the same time.
>   */
> -int
> -io_recvfd(int fd, void *res, size_t sz)
> +struct ibuf *
> +io_buf_recvfd(int fd, struct ibuf **ib)
>  {
> +     struct ibuf *b = *ib;
>       struct iovec iov;
>       struct msghdr msg;
>       struct cmsghdr *cmsg;
> @@ -190,15 +236,22 @@ io_recvfd(int fd, void *res, size_t sz)
>               struct cmsghdr  hdr;
>               char            buf[CMSG_SPACE(sizeof(int))];
>       } cmsgbuf;
> -     int outfd = -1;
> -     char *b = res;
>       ssize_t n;
> +     size_t sz;
>  
> +     /* fd are only passed on the head, just use regular read afterwards */
> +     if (b != NULL)
> +             return io_buf_read(fd, ib);
> +
> +     if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
> +             err(1, NULL);
> +     *ib = b;
> +     
>       memset(&msg, 0, sizeof(msg));
>       memset(&cmsgbuf, 0, sizeof(cmsgbuf));
>  
> -     iov.iov_base = res;
> -     iov.iov_len = sz;
> +     iov.iov_base = b->buf;
> +     iov.iov_len = b->size;
>  
>       msg.msg_iov = &iov;
>       msg.msg_iovlen = 1;
> @@ -225,29 +278,32 @@ io_recvfd(int fd, void *res, size_t sz)
>                       for (i = 0; i < j; i++) {
>                               f = ((int *)CMSG_DATA(cmsg))[i];
>                               if (i == 0)
> -                                     outfd = f;
> +                                     b->fd = f;
>                               else
>                                       close(f);
>                       }
>               }
>       }
>  
> -     b += n;
> -     sz -= n;
> -     while (sz > 0) {
> -             /* short receive */
> -             n = recv(fd, b, sz, 0);
> -             if (n == -1) {
> -                     if (errno == EINTR)
> -                             continue;
> -                     err(1, "recv");
> +     b->wpos += n;
> +
> +     /* got full message */
> +     if (b->wpos == b->size) {
> +             /* only header received */
> +             if (b->wpos == sizeof(sz)) {
> +                     memcpy(&sz, b->buf, sizeof(sz));
> +                     if (sz == 0 || sz > INT32_MAX)
> +                             errx(1, "read: bad internal framing, %zu", sz);
> +                     if (ibuf_realloc(b, sz) == -1)
> +                             err(1, "ibuf_realloc");
> +                     return NULL;
>               }
> -             if (n == 0)
> -                     errx(1, "recv: unexpected end of file");
>  
> -             b += n;
> -             sz -= n;
> +             /* skip over initial size header */
> +             b->rpos += sizeof(sz);
> +             *ib = NULL;
> +             return b;
>       }
>  
> -     return outfd;
> +     return NULL;
>  }
> Index: ip.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/ip.c,v
> retrieving revision 1.17
> diff -u -p -r1.17 ip.c
> --- ip.c      19 Apr 2021 17:04:35 -0000      1.17
> +++ ip.c      23 Oct 2021 11:59:43 -0000
> @@ -314,14 +314,14 @@ ip_addr_range_buffer(struct ibuf *b, con
>   * Matched with ip_addr_buffer().
>   */
>  void
> -ip_addr_read(int fd, struct ip_addr *p)
> +ip_addr_read(struct ibuf *b, struct ip_addr *p)
>  {
>       size_t sz;
>  
> -     io_simple_read(fd, &p->prefixlen, sizeof(unsigned char));
> +     io_read_buf(b, &p->prefixlen, sizeof(unsigned char));
>       sz = PREFIX_SIZE(p->prefixlen);
>       assert(sz <= 16);
> -     io_simple_read(fd, p->addr, sz);
> +     io_read_buf(b, p->addr, sz);
>  }
>  
>  /*
> @@ -329,11 +329,10 @@ ip_addr_read(int fd, struct ip_addr *p)
>   * Matched with ip_addr_range_buffer().
>   */
>  void
> -ip_addr_range_read(int fd, struct ip_addr_range *p)
> +ip_addr_range_read(struct ibuf *b, struct ip_addr_range *p)
>  {
> -
> -     ip_addr_read(fd, &p->min);
> -     ip_addr_read(fd, &p->max);
> +     ip_addr_read(b, &p->min);
> +     ip_addr_read(b, &p->max);
>  }
>  
>  /*
> Index: main.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
> retrieving revision 1.150
> diff -u -p -r1.150 main.c
> --- main.c    22 Oct 2021 11:13:06 -0000      1.150
> +++ main.c    23 Oct 2021 12:03:01 -0000
> @@ -99,17 +99,14 @@ entity_free(struct entity *ent)
>   * The pointer must be passed entity_free().
>   */
>  void
> -entity_read_req(int fd, struct entity *ent)
> +entity_read_req(struct ibuf *b, struct entity *ent)
>  {
> -     size_t size;
> -
> -     io_simple_read(fd, &size, sizeof(size));
> -     io_simple_read(fd, &ent->type, sizeof(enum rtype));
> -     io_str_read(fd, &ent->file);
> -     io_simple_read(fd, &ent->has_pkey, sizeof(int));
> +     io_read_buf(b, &ent->type, sizeof(ent->type));
> +     io_read_str(b, &ent->file);
> +     io_read_buf(b, &ent->has_pkey, sizeof(ent->has_pkey));
>       if (ent->has_pkey)
> -             io_buf_read_alloc(fd, (void **)&ent->pkey, &ent->pkeysz);
> -     io_str_read(fd, &ent->descr);
> +             io_read_buf_alloc(b, (void **)&ent->pkey, &ent->pkeysz);
> +     io_read_str(b, &ent->descr);
>  }
>  
>  /*
> @@ -459,7 +456,7 @@ queue_add_from_cert(const struct cert *c
>   * In all cases, we gather statistics.
>   */
>  static void
> -entity_process(int proc, struct stats *st, struct vrp_tree *tree,
> +entity_process(struct ibuf *b, struct stats *st, struct vrp_tree *tree,
>      struct brk_tree *brktree)
>  {
>       enum rtype       type;
> @@ -467,7 +464,6 @@ entity_process(int proc, struct stats *s
>       struct cert     *cert;
>       struct mft      *mft;
>       struct roa      *roa;
> -     size_t           size;
>       int              c;
>  
>       /*
> @@ -476,24 +472,23 @@ entity_process(int proc, struct stats *s
>        * certificate, for example).
>        * We follow that up with whether the resources didn't parse.
>        */
> -     io_simple_read(proc, &size, sizeof(size));
> -     io_simple_read(proc, &type, sizeof(type));
> +     io_read_buf(b, &type, sizeof(type));
>  
>       switch (type) {
>       case RTYPE_TAL:
>               st->tals++;
> -             tal = tal_read(proc);
> +             tal = tal_read(b);
>               queue_add_from_tal(tal);
>               tal_free(tal);
>               break;
>       case RTYPE_CER:
>               st->certs++;
> -             io_simple_read(proc, &c, sizeof(int));
> +             io_read_buf(b, &c, sizeof(c));
>               if (c == 0) {
>                       st->certs_fail++;
>                       break;
>               }
> -             cert = cert_read(proc);
> +             cert = cert_read(b);
>               if (cert->purpose == CERT_PURPOSE_CA) {
>                       if (cert->valid) {
>                               /*
> @@ -517,12 +512,12 @@ entity_process(int proc, struct stats *s
>               break;
>       case RTYPE_MFT:
>               st->mfts++;
> -             io_simple_read(proc, &c, sizeof(int));
> +             io_read_buf(b, &c, sizeof(c));
>               if (c == 0) {
>                       st->mfts_fail++;
>                       break;
>               }
> -             mft = mft_read(proc);
> +             mft = mft_read(b);
>               if (mft->stale)
>                       st->mfts_stale++;
>               queue_add_from_mft_set(mft);
> @@ -533,12 +528,12 @@ entity_process(int proc, struct stats *s
>               break;
>       case RTYPE_ROA:
>               st->roas++;
> -             io_simple_read(proc, &c, sizeof(int));
> +             io_read_buf(b, &c, sizeof(c));
>               if (c == 0) {
>                       st->roas_fail++;
>                       break;
>               }
> -             roa = roa_read(proc);
> +             roa = roa_read(b);
>               if (roa->valid)
>                       roa_insert_vrps(tree, roa, &st->vrps, &st->uniqs);
>               else
> @@ -555,6 +550,57 @@ entity_process(int proc, struct stats *s
>       entity_queue--;
>  }
>  
> +static void
> +rrdp_process(struct ibuf *b)
> +{
> +     enum rrdp_msg type;
> +     enum publish_type pt;
> +     struct rrdp_session s;
> +     char *uri, *last_mod, *data;
> +     char hash[SHA256_DIGEST_LENGTH];
> +     size_t dsz, id;
> +     int ok;
> +
> +     io_read_buf(b, &type, sizeof(type));
> +     io_read_buf(b, &id, sizeof(id));
> +
> +     switch (type) {
> +     case RRDP_END:
> +             io_read_buf(b, &ok, sizeof(ok));
> +             rrdp_finish(id, ok);
> +             break;
> +     case RRDP_HTTP_REQ:
> +             io_read_str(b, &uri);
> +             io_read_str(b, &last_mod);
> +             rrdp_http_fetch(id, uri, last_mod);
> +             break;
> +     case RRDP_SESSION:
> +             io_read_str(b, &s.session_id);
> +             io_read_buf(b, &s.serial, sizeof(s.serial));
> +             io_read_str(b, &s.last_mod);
> +             rrdp_save_state(id, &s);
> +             free(s.session_id);
> +             free(s.last_mod);
> +             break;
> +     case RRDP_FILE:
> +             io_read_buf(b, &pt, sizeof(pt));
> +             if (pt != PUB_ADD)
> +                     io_read_buf(b, &hash, sizeof(hash));
> +             io_read_str(b, &uri);
> +             io_read_buf_alloc(b, (void **)&data, &dsz);
> +
> +             ok = rrdp_handle_file(id, pt, uri, hash, sizeof(hash),
> +                 data, dsz);
> +             rrdp_file_resp(id, ok);
> +
> +             free(uri);
> +             free(data);
> +             break;
> +     default:
> +             errx(1, "unexpected rrdp response");
> +     }
> +}
> +
>  /*
>   * Assign filenames ending in ".tal" in "/etc/rpki" into "tals",
>   * returning the number of files found and filled-in.
> @@ -623,19 +669,21 @@ suicide(int sig __attribute__((unused)))
>  int
>  main(int argc, char *argv[])
>  {
> -     int              rc, c, st, proc, rsync, http, rrdp, ok,
> -                      hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC;
> -     size_t           i, id, talsz = 0, size;
> +     int              rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0;
> +     int              fl = SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK;
> +     size_t           i, id, talsz = 0;
>       pid_t            pid, procpid, rsyncpid, httppid, rrdppid;
>       int              fd[2];
>       struct pollfd    pfd[NPFD];
>       struct msgbuf   *queues[NPFD];
> +     struct ibuf     *b, *httpbuf = NULL, *procbuf = NULL;
> +     struct ibuf     *rrdpbuf = NULL, *rsyncbuf = NULL;
>       char            *rsync_prog = "openrsync";
>       char            *bind_addr = NULL;
>       const char      *cachedir = NULL, *outputdir = NULL;
>       const char      *tals[TALSZ_MAX], *errs, *name;
> -     struct vrp_tree  v = RB_INITIALIZER(&v);
> -     struct brk_tree  b = RB_INITIALIZER(&b);
> +     struct vrp_tree  vrps = RB_INITIALIZER(&vrps);
> +     struct brk_tree  brks = RB_INITIALIZER(&brks);
>       struct rusage   ru;
>       struct timeval  start_time, now_time;
>  
> @@ -972,12 +1020,6 @@ main(int argc, char *argv[])
>                               hangup = 1;
>                       }
>                       if (pfd[i].revents & POLLOUT) {
> -                             /*
> -                              * XXX work around deadlocks because of
> -                              * blocking read vs non-blocking writes.
> -                              */
> -                             if (i > 1)
> -                                     io_socket_nonblocking(pfd[i].fd);
>                               switch (msgbuf_write(queues[i])) {
>                               case 0:
>                                       errx(1, "write[%zu]: "
> @@ -985,8 +1027,6 @@ main(int argc, char *argv[])
>                               case -1:
>                                       err(1, "write[%zu]", i);
>                               }
> -                             if (i > 1)
> -                                     io_socket_blocking(pfd[i].fd);
>                       }
>               }
>               if (hangup)
> @@ -1000,75 +1040,38 @@ main(int argc, char *argv[])
>                */
>  
>               if ((pfd[1].revents & POLLIN)) {
> -                     io_simple_read(rsync, &size, sizeof(size));
> -                     io_simple_read(rsync, &id, sizeof(id));
> -                     io_simple_read(rsync, &ok, sizeof(ok));
> -                     rsync_finish(id, ok);
> +                     b = io_buf_read(rsync, &rsyncbuf);
> +                     if (b != NULL) {
> +                             io_read_buf(b, &id, sizeof(id));
> +                             io_read_buf(b, &ok, sizeof(ok));
> +                             rsync_finish(id, ok);
> +                             ibuf_free(b);
> +                     }
>               }
>  
>               if ((pfd[2].revents & POLLIN)) {
> -                     enum http_result res;
> -                     char *last_mod;
> -
> -                     io_simple_read(http, &size, sizeof(size));
> -                     io_simple_read(http, &id, sizeof(id));
> -                     io_simple_read(http, &res, sizeof(res));
> -                     io_str_read(http, &last_mod);
> -                     http_finish(id, res, last_mod);
> -                     free(last_mod);
> +                     b = io_buf_read(http, &httpbuf);
> +                     if (b != NULL) {
> +                             enum http_result res;
> +                             char *last_mod;
> +
> +                             io_read_buf(b, &id, sizeof(id));
> +                             io_read_buf(b, &res, sizeof(res));
> +                             io_read_str(b, &last_mod);
> +                             http_finish(id, res, last_mod);
> +                             free(last_mod);
> +                             ibuf_free(b);
> +                     }
>               }
>  
>               /*
>                * Handle RRDP requests here.
>                */
>               if ((pfd[3].revents & POLLIN)) {
> -                     enum rrdp_msg type;
> -                     enum publish_type pt;
> -                     struct rrdp_session s;
> -                     char *uri, *last_mod, *data;
> -                     char hash[SHA256_DIGEST_LENGTH];
> -                     size_t dsz;
> -
> -                     io_simple_read(rrdp, &size, sizeof(size));
> -                     io_simple_read(rrdp, &type, sizeof(type));
> -                     io_simple_read(rrdp, &id, sizeof(id));
> -
> -                     switch (type) {
> -                     case RRDP_END:
> -                             io_simple_read(rrdp, &ok, sizeof(ok));
> -                             rrdp_finish(id, ok);
> -                             break;
> -                     case RRDP_HTTP_REQ:
> -                             io_str_read(rrdp, &uri);
> -                             io_str_read(rrdp, &last_mod);
> -                             rrdp_http_fetch(id, uri, last_mod);
> -                             break;
> -                     case RRDP_SESSION:
> -                             io_str_read(rrdp, &s.session_id);
> -                             io_simple_read(rrdp, &s.serial,
> -                                 sizeof(s.serial));
> -                             io_str_read(rrdp, &s.last_mod);
> -                             rrdp_save_state(id, &s);
> -                             free(s.session_id);
> -                             free(s.last_mod);
> -                             break;
> -                     case RRDP_FILE:
> -                             io_simple_read(rrdp, &pt, sizeof(pt));
> -                             if (pt != PUB_ADD)
> -                                     io_simple_read(rrdp, &hash,
> -                                         sizeof(hash));
> -                             io_str_read(rrdp, &uri);
> -                             io_buf_read_alloc(rrdp, (void **)&data, &dsz);
> -
> -                             ok = rrdp_handle_file(id, pt, uri,
> -                                 hash, sizeof(hash), data, dsz);
> -                             rrdp_file_resp(id, ok);
> -
> -                             free(uri);
> -                             free(data);
> -                             break;
> -                     default:
> -                             errx(1, "unexpected rrdp response");
> +                     b = io_buf_read(rrdp, &rrdpbuf);
> +                     if (b != NULL) {
> +                             rrdp_process(b);
> +                             ibuf_free(b);
>                       }
>               }
>  
> @@ -1078,7 +1081,11 @@ main(int argc, char *argv[])
>                */
>  
>               if ((pfd[0].revents & POLLIN)) {
> -                     entity_process(proc, &stats, &v, &b);
> +                     b = io_buf_read(proc, &procbuf);
> +                     if (b != NULL) {
> +                             entity_process(b, &stats, &vrps, &brks);
> +                             ibuf_free(b);
> +                     }
>               }
>       }
>  
> @@ -1154,7 +1161,7 @@ main(int argc, char *argv[])
>       if (fchdir(outdirfd) == -1)
>               err(1, "fchdir output dir");
>  
> -     if (outputfiles(&v, &b, &stats))
> +     if (outputfiles(&vrps, &brks, &stats))
>               rc = 1;
>  
>  
> Index: mft.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/mft.c,v
> retrieving revision 1.38
> diff -u -p -r1.38 mft.c
> --- mft.c     9 Sep 2021 14:15:49 -0000       1.38
> +++ mft.c     23 Oct 2021 12:04:20 -0000
> @@ -564,7 +564,7 @@ mft_buffer(struct ibuf *b, const struct 
>   * Result must be passed to mft_free().
>   */
>  struct mft *
> -mft_read(int fd)
> +mft_read(struct ibuf *b)
>  {
>       struct mft      *p = NULL;
>       size_t           i;
> @@ -572,22 +572,22 @@ mft_read(int fd)
>       if ((p = calloc(1, sizeof(struct mft))) == NULL)
>               err(1, NULL);
>  
> -     io_simple_read(fd, &p->stale, sizeof(int));
> -     io_str_read(fd, &p->file);
> -     assert(p->file);
> -     io_simple_read(fd, &p->filesz, sizeof(size_t));
> +     io_read_buf(b, &p->stale, sizeof(int));
> +     io_read_str(b, &p->file);
> +     io_read_buf(b, &p->filesz, sizeof(size_t));
>  
> +     assert(p->file);
>       if ((p->files = calloc(p->filesz, sizeof(struct mftfile))) == NULL)
>               err(1, NULL);
>  
>       for (i = 0; i < p->filesz; i++) {
> -             io_str_read(fd, &p->files[i].file);
> -             io_simple_read(fd, p->files[i].hash, SHA256_DIGEST_LENGTH);
> +             io_read_str(b, &p->files[i].file);
> +             io_read_buf(b, p->files[i].hash, SHA256_DIGEST_LENGTH);
>       }
>  
> -     io_str_read(fd, &p->aia);
> -     io_str_read(fd, &p->aki);
> -     io_str_read(fd, &p->ski);
> +     io_read_str(b, &p->aia);
> +     io_read_str(b, &p->aki);
> +     io_read_str(b, &p->ski);
>       assert(p->aia && p->aki && p->ski);
>  
>       return p;
> Index: parser.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/parser.c,v
> retrieving revision 1.14
> diff -u -p -r1.14 parser.c
> --- parser.c  22 Oct 2021 11:13:06 -0000      1.14
> +++ parser.c  23 Oct 2021 12:04:54 -0000
> @@ -525,7 +525,7 @@ proc_parser(int fd)
>       struct entityq   q;
>       struct msgbuf    msgq;
>       struct pollfd    pfd;
> -     struct ibuf     *b;
> +     struct ibuf     *b, *inbuf = NULL;
>       X509_STORE_CTX  *ctx;
>       struct auth_tree auths = RB_INITIALIZER(&auths);
>       struct crl_tree  crlt = RB_INITIALIZER(&crlt);
> @@ -545,8 +545,6 @@ proc_parser(int fd)
>  
>       pfd.fd = fd;
>  
> -     io_socket_nonblocking(pfd.fd);
> -
>       for (;;) {
>               pfd.events = POLLIN;
>               if (msgq.queued)
> @@ -571,13 +569,16 @@ proc_parser(int fd)
>                */
>  
>               if ((pfd.revents & POLLIN)) {
> -                     io_socket_blocking(fd);
> -                     entp = calloc(1, sizeof(struct entity));
> -                     if (entp == NULL)
> -                             err(1, NULL);
> -                     entity_read_req(fd, entp);
> -                     TAILQ_INSERT_TAIL(&q, entp, entries);
> -                     io_socket_nonblocking(fd);
> +                     b = io_buf_read(fd, &inbuf);
> +                     
> +                     if (b != NULL) {
> +                             entp = calloc(1, sizeof(struct entity));
> +                             if (entp == NULL)
> +                                     err(1, NULL);
> +                             entity_read_req(b, entp);
> +                             TAILQ_INSERT_TAIL(&q, entp, entries);
> +                             ibuf_free(b);
> +                     }
>               }
>  
>               if (pfd.revents & POLLOUT) {
> Index: roa.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/roa.c,v
> retrieving revision 1.26
> diff -u -p -r1.26 roa.c
> --- roa.c     7 Oct 2021 08:28:45 -0000       1.26
> +++ roa.c     23 Oct 2021 12:05:47 -0000
> @@ -448,7 +448,7 @@ roa_buffer(struct ibuf *b, const struct 
>   * Result must be passed to roa_free().
>   */
>  struct roa *
> -roa_read(int fd)
> +roa_read(struct ibuf *b)
>  {
>       struct roa      *p;
>       size_t           i;
> @@ -456,26 +456,26 @@ roa_read(int fd)
>       if ((p = calloc(1, sizeof(struct roa))) == NULL)
>               err(1, NULL);
>  
> -     io_simple_read(fd, &p->valid, sizeof(int));
> -     io_simple_read(fd, &p->asid, sizeof(uint32_t));
> -     io_simple_read(fd, &p->ipsz, sizeof(size_t));
> -     io_simple_read(fd, &p->expires, sizeof(time_t));
> +     io_read_buf(b, &p->valid, sizeof(int));
> +     io_read_buf(b, &p->asid, sizeof(uint32_t));
> +     io_read_buf(b, &p->ipsz, sizeof(size_t));
> +     io_read_buf(b, &p->expires, sizeof(time_t));
>  
>       if ((p->ips = calloc(p->ipsz, sizeof(struct roa_ip))) == NULL)
>               err(1, NULL);
>  
>       for (i = 0; i < p->ipsz; i++) {
> -             io_simple_read(fd, &p->ips[i].afi, sizeof(enum afi));
> -             io_simple_read(fd, &p->ips[i].maxlength, sizeof(size_t));
> -             io_simple_read(fd, &p->ips[i].min, sizeof(p->ips[i].min));
> -             io_simple_read(fd, &p->ips[i].max, sizeof(p->ips[i].max));
> -             ip_addr_read(fd, &p->ips[i].addr);
> +             io_read_buf(b, &p->ips[i].afi, sizeof(enum afi));
> +             io_read_buf(b, &p->ips[i].maxlength, sizeof(size_t));
> +             io_read_buf(b, &p->ips[i].min, sizeof(p->ips[i].min));
> +             io_read_buf(b, &p->ips[i].max, sizeof(p->ips[i].max));
> +             ip_addr_read(b, &p->ips[i].addr);
>       }
>  
> -     io_str_read(fd, &p->aia);
> -     io_str_read(fd, &p->aki);
> -     io_str_read(fd, &p->ski);
> -     io_str_read(fd, &p->tal);
> +     io_read_str(b, &p->aia);
> +     io_read_str(b, &p->aki);
> +     io_read_str(b, &p->ski);
> +     io_read_str(b, &p->tal);
>       assert(p->aia && p->aki && p->ski && p->tal);
>  
>       return p;
> Index: rrdp.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/rrdp.c,v
> retrieving revision 1.12
> diff -u -p -r1.12 rrdp.c
> --- rrdp.c    22 Oct 2021 11:13:06 -0000      1.12
> +++ rrdp.c    23 Oct 2021 12:06:53 -0000
> @@ -378,32 +378,37 @@ rrdp_finished(struct rrdp *s)
>  static void
>  rrdp_input_handler(int fd)
>  {
> +     static struct ibuf *inbuf;
>       char *local, *notify, *session_id, *last_mod;
> +     struct ibuf *b;
>       struct rrdp *s;
>       enum rrdp_msg type;
>       enum http_result res;
>       long long serial;
> -     size_t id, size;
> -     int infd, ok;
> +     size_t id;
> +     int ok;
>  
> -     infd = io_recvfd(fd, &size, sizeof(size));
> -     io_simple_read(fd, &type, sizeof(type));
> -     io_simple_read(fd, &id, sizeof(id));
> +     b = io_buf_recvfd(fd, &inbuf);
> +     if (b == NULL)
> +             return;
> +
> +     io_read_buf(b, &type, sizeof(type));
> +     io_read_buf(b, &id, sizeof(id));
>  
>       switch (type) {
>       case RRDP_START:
> -             io_str_read(fd, &local);
> -             io_str_read(fd, &notify);
> -             io_str_read(fd, &session_id);
> -             io_simple_read(fd, &serial, sizeof(serial));
> -             io_str_read(fd, &last_mod);
> -             if (infd != -1)
> -                     errx(1, "received unexpected fd %d", infd);
> +             io_read_str(b, &local);
> +             io_read_str(b, &notify);
> +             io_read_str(b, &session_id);
> +             io_read_buf(b, &serial, sizeof(serial));
> +             io_read_str(b, &last_mod);
> +             if (b->fd != -1)
> +                     errx(1, "received unexpected fd");
>  
>               s = rrdp_new(id, local, notify, session_id, serial, last_mod);
>               break;
>       case RRDP_HTTP_INI:
> -             if (infd == -1)
> +             if (b->fd == -1)
>                       errx(1, "expected fd not received");
>               s = rrdp_get(id);
>               if (s == NULL)
> @@ -411,13 +416,13 @@ rrdp_input_handler(int fd)
>               if (s->state != RRDP_STATE_WAIT)
>                       errx(1, "%s: bad internal state", s->local);
>  
> -             s->infd = infd;
> +             s->infd = b->fd;
>               s->state = RRDP_STATE_PARSE;
>               break;
>       case RRDP_HTTP_FIN:
> -             io_simple_read(fd, &res, sizeof(res));
> -             io_str_read(fd, &last_mod);
> -             if (infd != -1)
> +             io_read_buf(b, &res, sizeof(res));
> +             io_read_str(b, &last_mod);
> +             if (b->fd != -1)
>                       errx(1, "received unexpected fd");
>  
>               s = rrdp_get(id);
> @@ -435,12 +440,11 @@ rrdp_input_handler(int fd)
>               s = rrdp_get(id);
>               if (s == NULL)
>                       errx(1, "rrdp session %zu does not exist", id);
> -             if (infd != -1)
> -                     errx(1, "received unexpected fd %d", infd);
> -             io_simple_read(fd, &ok, sizeof(ok));
> -             if (ok != 1) {
> +             if (b->fd != -1)
> +                     errx(1, "received unexpected fd");
> +             io_read_buf(b, &ok, sizeof(ok));
> +             if (ok != 1)
>                       s->file_failed++;
> -             }
>               s->file_pending--;
>               if (s->file_pending == 0)
>                       rrdp_finished(s);
> @@ -448,6 +452,7 @@ rrdp_input_handler(int fd)
>       default:
>               errx(1, "unexpected message %d", type);
>       }
> +     ibuf_free(b);
>  }
>  
>  static void
> @@ -558,14 +563,12 @@ proc_rrdp(int fd)
>               if (pfds[0].revents & POLLHUP)
>                       break;
>               if (pfds[0].revents & POLLOUT) {
> -                     io_socket_nonblocking(fd);
>                       switch (msgbuf_write(&msgq)) {
>                       case 0:
>                               errx(1, "write: connection closed");
>                       case -1:
>                               err(1, "write");
>                       }
> -                     io_socket_blocking(fd);
>               }
>               if (pfds[0].revents & POLLIN)
>                       rrdp_input_handler(fd);
> Index: rsync.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/rsync.c,v
> retrieving revision 1.26
> diff -u -p -r1.26 rsync.c
> --- rsync.c   22 Oct 2021 11:13:06 -0000      1.26
> +++ rsync.c   23 Oct 2021 12:07:41 -0000
> @@ -120,6 +120,7 @@ proc_rsync(char *prog, char *bind_addr, 
>       int                      rc = 0;
>       struct pollfd            pfd;
>       struct msgbuf            msgq;
> +     struct ibuf             *b, *inbuf = NULL;
>       sigset_t                 mask, oldmask;
>       struct rsyncproc        *ids = NULL;
>  
> @@ -178,7 +179,7 @@ proc_rsync(char *prog, char *bind_addr, 
>  
>       for (;;) {
>               char *uri = NULL, *dst = NULL;
> -             size_t id, size;
> +             size_t id;
>               pid_t pid;
>               int st;
>  
> @@ -198,7 +199,6 @@ proc_rsync(char *prog, char *bind_addr, 
>                        */
>  
>                       while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) {
> -                             struct ibuf *b;
>                               int ok = 1;
>  
>                               for (i = 0; i < idsz; i++)
> @@ -247,11 +247,17 @@ proc_rsync(char *prog, char *bind_addr, 
>               if (!(pfd.revents & POLLIN))
>                       continue;
>  
> +             b = io_buf_read(fd, &inbuf);
> +             if (b == NULL)
> +                     continue;
> +
>               /* Read host and module. */
> -             io_simple_read(fd, &size, sizeof(size));
> -             io_simple_read(fd, &id, sizeof(id));
> -             io_str_read(fd, &dst);
> -             io_str_read(fd, &uri);
> +             io_read_buf(b, &id, sizeof(id));
> +             io_read_str(b, &dst);
> +             io_read_str(b, &uri);
> +
> +             ibuf_free(b);
> +
>               assert(dst);
>               assert(uri);
>  
> Index: tal.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/tal.c,v
> retrieving revision 1.30
> diff -u -p -r1.30 tal.c
> --- tal.c     1 Apr 2021 06:43:23 -0000       1.30
> +++ tal.c     23 Oct 2021 12:07:17 -0000
> @@ -284,7 +284,7 @@ tal_buffer(struct ibuf *b, const struct 
>   * A returned pointer must be freed with tal_free().
>   */
>  struct tal *
> -tal_read(int fd)
> +tal_read(struct ibuf *b)
>  {
>       size_t           i;
>       struct tal      *p;
> @@ -292,18 +292,18 @@ tal_read(int fd)
>       if ((p = calloc(1, sizeof(struct tal))) == NULL)
>               err(1, NULL);
>  
> -     io_buf_read_alloc(fd, (void **)&p->pkey, &p->pkeysz);
> +     io_read_buf_alloc(b, (void **)&p->pkey, &p->pkeysz);
> +     io_read_str(b, &p->descr);
> +     io_read_buf(b, &p->urisz, sizeof(size_t));
>       assert(p->pkeysz > 0);
> -     io_str_read(fd, &p->descr);
>       assert(p->descr);
> -     io_simple_read(fd, &p->urisz, sizeof(size_t));
>       assert(p->urisz > 0);
>  
>       if ((p->uri = calloc(p->urisz, sizeof(char *))) == NULL)
>               err(1, NULL);
>  
>       for (i = 0; i < p->urisz; i++) {
> -             io_str_read(fd, &p->uri[i]);
> +             io_read_str(b, &p->uri[i]);
>               assert(p->uri[i]);
>       }
>  
> 

Reply via email to