On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
> See the comment at the top of plugins/curl/pool.c for general
> information about how this works.
> 
> This makes a very large difference to performance over the previous
> implementation.  Note for the tests below I also applied the next
> commit changing the behaviour of the connections parameter.
> 
> Using this test case:
> 
>   $ 
> uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
>   $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'
> 
> The times are as follows:
> 
>   multi, connections=64           21.5s
>   multi, connections=32           30.2s
>   multi, connections=16           56.0s
>   before this commit             166s

Awesome performance improvements!  As painful as this series has been
for you to write and debug, it is showing its worth.

> ---
>  plugins/curl/curldefs.h |  35 ++--
>  plugins/curl/config.c   | 246 ---------------------------
>  plugins/curl/curl.c     | 366 +++++++++++++++++++++++++++++++++++-----
>  plugins/curl/pool.c     | 346 ++++++++++++++++++++++++++++---------
>  4 files changed, 616 insertions(+), 377 deletions(-)

Finally taking time to review this, even though it is already in-tree.

> @@ -98,8 +88,30 @@ struct curl_handle {
>    const char *read_buf;
>    uint32_t read_count;
>  
> +  /* This field is used by curl_get_size. */
> +  bool accept_range;
> +
>    /* Used by scripts.c */
>    struct curl_slist *headers_copy;
> +
> +  /* Used by pool.c */
> +  struct command *cmd;
> +};
> +
> +/* Asynchronous commands that can be sent to the pool thread. */
> +enum command_type { EASY_HANDLE, STOP };
> +struct command {
> +  /* These fields are set by the caller. */
> +  enum command_type type;       /* command */
> +  struct curl_handle *ch;       /* for EASY_HANDLE, the easy handle */
> +
> +  /* This field is set to a unique value by send_command_and_wait. */
> +  uint64_t id;                  /* serial number */
> +
> +  /* These fields are used to signal back that the command finished. */
> +  pthread_mutex_t mutex;        /* completion mutex */
> +  pthread_cond_t cond;          /* completion condition */
> +  CURLcode status;              /* status code (CURLE_OK = succeeded) */
>  };

Makes sense.  The two types are mutually recursive (curl_handle
includes a struct command *; command includes a struct curl_handle *);
hopefully you have proper locking when altering multiple objects to
adjust how they point to one another.

> +++ b/plugins/curl/config.c

> +++ b/plugins/curl/curl.c
>  
> +/* Get the file size. */
> +static int get_content_length_accept_range (struct curl_handle *ch);
> +static bool try_fallback_GET_method (struct curl_handle *ch);
> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
> +
> +static int64_t
> +curl_get_size (void *handle)
> +{
> +  struct curl_handle *ch;
> +  CURLcode r;
> +  long code;
> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> +  curl_off_t o;
> +#else
> +  double d;
> +#endif
> +  int64_t exportsize;
> +
> +  /* Get a curl easy handle. */
> +  ch = allocate_handle ();
> +  if (ch == NULL) goto err;
> +
> +  /* Prepare to read the headers. */
> +  if (get_content_length_accept_range (ch) == -1)
> +    goto err;
> +
> +  /* Send the command to the worker thread and wait. */
> +  struct command cmd = {
> +    .type = EASY_HANDLE,
> +    .ch = ch,
> +  };
> +
> +  r = send_command_and_wait (&cmd);
> +  update_times (ch->c);
> +  if (r != CURLE_OK) {
> +    display_curl_error (ch, r,
> +                        "problem doing HEAD request to fetch size of URL 
> [%s]",
> +                        url);
> +
> +    /* Get the HTTP status code, if available. */
> +    r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
> +    if (r == CURLE_OK)
> +      nbdkit_debug ("HTTP status code: %ld", code);
> +    else
> +      code = -1;
> +
> +    /* See comment on try_fallback_GET_method below. */
> +    if (code != 403 || !try_fallback_GET_method (ch))
> +      goto err;
> +  }
> +
> +  /* Get the content length.
> +   *
> +   * Note there is some subtlety here: For web servers using chunked
> +   * encoding, either the Content-Length header will not be present,
> +   * or if present it should be ignored.  (For such servers the only
> +   * way to find out the true length would be to read all of the
> +   * content, which we don't want to do).
> +   *
> +   * Curl itself resolves this for us.  It will ignore the
> +   * Content-Length header if chunked encoding is used, returning the
> +   * length as -1 which we check below (see also
> +   * curl:lib/http.c:Curl_http_size).
> +   */
> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
> +  if (r != CURLE_OK) {
> +    display_curl_error (ch, r,
> +                        "could not get length of remote file [%s]", url);
> +    goto err;
> +  }
> +
> +  if (o == -1) {
> +    nbdkit_error ("could not get length of remote file [%s], "
> +                  "is the URL correct?", url);
> +    goto err;
> +  }
> +
> +  exportsize = o;
> +#else
> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
> +  if (r != CURLE_OK) {
> +    display_curl_error (ch, r,
> +                        "could not get length of remote file [%s]", url);
> +    goto err;
> +  }
> +
> +  if (d == -1) {
> +    nbdkit_error ("could not get length of remote file [%s], "
> +                  "is the URL correct?", url);
> +    goto err;
> +  }
> +
> +  exportsize = d;

Does curl guarantee that the double d will contain a value assignable
to int64_t without overflow/truncation?  For particularly large sizes,
double has insufficient precision for all possible file sizes, but I
doubt someone is exposing such large files over HTTP.

> +#endif
> +  nbdkit_debug ("content length: %" PRIi64, exportsize);
> +
> +  /* If this is HTTP, check that byte ranges are supported. */
> +  if (ascii_strncasecmp (url, "http://";, strlen ("http://";)) == 0 ||
> +      ascii_strncasecmp (url, "https://";, strlen ("https://";)) == 0) {
> +    if (!ch->accept_range) {
> +      nbdkit_error ("server does not support 'range' (byte range) requests");
> +      goto err;
> +    }
> +
> +    nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
> +  }
> +
> +  free_handle (ch);
> +  return exportsize;
> +
> + err:
> +  if (ch)
> +    free_handle (ch);
> +  return -1;
> +}
> +
> +/* Get the file size and also whether the remote HTTP server
> + * supports byte ranges.
> + */
> +static int
> +get_content_length_accept_range (struct curl_handle *ch)
> +{
> +  /* We must run the scripts if necessary and set headers in the
> +   * handle.
> +   */
> +  if (do_scripts (ch) == -1)
> +    return -1;
> +
> +  /* Set this flag in the handle to false.  The callback should set it
> +   * to true if byte ranges are supported, which we check below.
> +   */
> +  ch->accept_range = false;
> +
> +  /* No Body, not nobody!  This forces a HEAD request. */
> +  curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
> +  return 0;
> +}
> +
> +/* S3 servers can return 403 Forbidden for HEAD but still respond
> + * to GET, so we give it a second chance in that case.
> + * https://github.com/kubevirt/containerized-data-importer/issues/2737
> + *
> + * This function issues a GET request with a writefunction that always
> + * returns an error, thus effectively getting the headers but
> + * abandoning the transfer as soon as possible after.
> + */
> +static bool
> +try_fallback_GET_method (struct curl_handle *ch)
> +{
> +  CURLcode r;
> +
> +  nbdkit_debug ("attempting to fetch headers using GET method");
> +
> +  curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
> +
> +  struct command cmd = {
> +    .type = EASY_HANDLE,
> +    .ch = ch,
> +  };
> +
> +  r = send_command_and_wait (&cmd);
> +  update_times (ch->c);
> +
> +  /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
> +   * (eg if the remote has zero length).  Other errors might happen
> +   * but we ignore them since it is a fallback path.
> +   */
> +  return r == CURLE_OK || r == CURLE_WRITE_ERROR;
> +}
> +
> +static size_t
> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> +  struct curl_handle *ch = opaque;
> +  size_t realsize = size * nmemb;
> +  const char *header = ptr;
> +  const char *end = header + realsize;
> +  const char *accept_ranges = "accept-ranges:";
> +  const char *bytes = "bytes";
> +
> +  if (realsize >= strlen (accept_ranges) &&
> +      ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 
> 0) {
> +    const char *p = strchr (header, ':') + 1;
> +
> +    /* Skip whitespace between the header name and value. */
> +    while (p < end && *p && ascii_isspace (*p))

Technically, '*p && ascii_isspace (*p)' can be shortened to
'ascii_isspace (*p)', since the NUL byte is not ascii space.  I don't
know if the compiler is smart enough to make that optimization on your
behalf.

> +      p++;
> +
> +    if (end - p >= strlen (bytes)
> +        && strncmp (p, bytes, strlen (bytes)) == 0) {
> +      /* Check that there is nothing but whitespace after the value. */
> +      p += strlen (bytes);
> +      while (p < end && *p && ascii_isspace (*p))

Another spot of the same.

> +        p++;
> +
> +      if (p == end || !*p)
> +        ch->accept_range = true;
> +    }
> +  }
> +
> +  return realsize;
> +}
> +
> +static size_t
> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> +#ifdef CURL_WRITEFUNC_ERROR
> +  return CURL_WRITEFUNC_ERROR;
> +#else
> +  return 0; /* in older curl, any size < requested will also be an error */
> +#endif
> +}
> +
>  /* Read data from the remote server. */
> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
> +
>  static int
>  curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
>  {
>    CURLcode r;
> +  struct curl_handle *ch;
>    char range[128];
>  
> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> -  if (ch == NULL)
> -    return -1;
> +  /* Get a curl easy handle. */
> +  ch = allocate_handle ();
> +  if (ch == NULL) goto err;
>  
>    /* Run the scripts if necessary and set headers in the handle. */
> -  if (do_scripts (ch) == -1) return -1;
> +  if (do_scripts (ch) == -1) goto err;
>  
>    /* Tell the write_cb where we want the data to be written.  write_cb
>     * will update this if the data comes in multiple sections.
>     */
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>    ch->write_buf = buf;
>    ch->write_count = count;
>  
> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, 
> uint64_t offset)
>              offset, offset + count);
>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>  
> -  /* The assumption here is that curl will look after timeouts. */
> -  r = curl_easy_perform (ch->c);
> +  /* Send the command to the worker thread and wait. */
> +  struct command cmd = {
> +    .type = EASY_HANDLE,
> +    .ch = ch,
> +  };
> +
> +  r = send_command_and_wait (&cmd);
>    if (r != CURLE_OK) {
> -    display_curl_error (ch, r, "pread: curl_easy_perform");
> -    return -1;
> +    display_curl_error (ch, r, "pread");
> +    goto err;
>    }
>    update_times (ch->c);
>  
> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, 
> uint64_t offset)
>    /* As far as I understand the cURL API, this should never happen. */
>    assert (ch->write_count == 0);
>  
> +  free_handle (ch);
>    return 0;
> +
> + err:
> +  if (ch)
> +    free_handle (ch);
> +  return -1;
> +}
> +
> +/* NB: The terminology used by libcurl is confusing!
> + *
> + * WRITEFUNCTION / write_cb is used when reading from the remote server
> + * READFUNCTION / read_cb is used when writing to the remote server.
> + *
> + * We use the same terminology as libcurl here.
> + */
> +static size_t
> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> +  struct curl_handle *ch = opaque;
> +  size_t orig_realsize = size * nmemb;
> +  size_t realsize = orig_realsize;

Do we have to worry about overflow when compiling on 32-bit machines?
Asked differently, should we be using off_t instead of size_t in any
of this code?  Thankfully, for now, we know NBD .pread and .pwrite
requests are capped at 64M, so I think you're okay (we aren't ever
going to ask curl for gigabytes in one request), but maybe a comment
or assert() is worth it?

> +
> +  assert (ch->write_buf);
> +
> +  /* Don't read more than the requested amount of data, even if the
> +   * server or libcurl sends more.
> +   */
> +  if (realsize > ch->write_count)
> +    realsize = ch->write_count;
> +
> +  memcpy (ch->write_buf, ptr, realsize);
> +
> +  ch->write_count -= realsize;
> +  ch->write_buf += realsize;
> +
> +  return orig_realsize;

[1]

>  }
>  
>  /* Write data to the remote server. */
> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
> +
>  static int
>  curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
>  {
>    CURLcode r;
> +  struct curl_handle *ch;
>    char range[128];
>  
> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> -  if (ch == NULL)
> -    return -1;
> +  /* Get a curl easy handle. */
> +  ch = allocate_handle ();
> +  if (ch == NULL) goto err;
>  
>    /* Run the scripts if necessary and set headers in the handle. */
> -  if (do_scripts (ch) == -1) return -1;
> +  if (do_scripts (ch) == -1) goto err;
>  
>    /* Tell the read_cb where we want the data to be read from.  read_cb
>     * will update this if the data comes in multiple sections.
>     */
> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
>    ch->read_buf = buf;
>    ch->read_count = count;
>  
> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t 
> count, uint64_t offset)
>              offset, offset + count);
>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>  
> -  /* The assumption here is that curl will look after timeouts. */
> -  r = curl_easy_perform (ch->c);
> +  /* Send the command to the worker thread and wait. */
> +  struct command cmd = {
> +    .type = EASY_HANDLE,
> +    .ch = ch,
> +  };
> +
> +  r = send_command_and_wait (&cmd);
>    if (r != CURLE_OK) {
> -    display_curl_error (ch, r, "pwrite: curl_easy_perform");
> -    return -1;
> +    display_curl_error (ch, r, "pwrite");
> +    goto err;
>    }
>    update_times (ch->c);
>  
> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t 
> count, uint64_t offset)
>    /* As far as I understand the cURL API, this should never happen. */
>    assert (ch->read_count == 0);
>  
> +  free_handle (ch);
>    return 0;
> +
> + err:
> +  if (ch)
> +    free_handle (ch);
> +  return -1;
> +}
> +
> +static size_t
> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> +{
> +  struct curl_handle *ch = opaque;
> +  size_t realsize = size * nmemb;
> +
> +  assert (ch->read_buf);
> +  if (realsize > ch->read_count)
> +    realsize = ch->read_count;
> +
> +  memcpy (ptr, ch->read_buf, realsize);
> +
> +  ch->read_count -= realsize;
> +  ch->read_buf += realsize;
> +
> +  return realsize;

Why does write_cb in [1] above return orig_realsize, but read_cb
returns the potentially modified realsize?

>  }
>  
>  static struct nbdkit_plugin plugin = {
> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
> index eb2d330e1..2974cda3f 100644
> --- a/plugins/curl/pool.c
> +++ b/plugins/curl/pool.c
> @@ -30,11 +30,29 @@
>   * SUCH DAMAGE.
>   */
>  
> -/* Curl handle pool.
> +/* Worker thread which processes the curl multi interface.
>   *
> - * To get a libcurl handle, call get_handle().  When you hold the
> - * handle, it is yours exclusively to use.  After you have finished
> - * with the handle, put it back into the pool by calling put_handle().
> + * The main nbdkit threads (see curl.c) create curl easy handles
> + * initialized with the work they want to carry out.  Note there is
> + * one easy handle per task (eg. per pread/pwrite request).  The easy
> + * handles are not reused.
> + *
> + * The commands + optional easy handle are submitted to the worker
> + * thread over a self-pipe (it's easy to use a pipe here because the
> + * way curl multi works is it can listen on an extra fd, but not on
> + * anything else like a pthread condition).  The curl multi performs
> + * the work of the outstanding easy handles.
> + *
> + * When an easy handle finishes work or errors, we retire the command
> + * by signalling back to the waiting nbdkit thread using a pthread
> + * condition.
> + *
> + * In my experiments, we're almost always I/O bound so I haven't seen
> + * any strong need to use more than one curl multi / worker thread,
> + * although it would be possible to add more in future.
> + *
> + * See also this extremely useful thread:
> + * https://curl.se/mail/lib-2019-03/0100.html

Very useful comment (and link).

>   */
>  
>  #include <config.h>
> @@ -45,9 +63,19 @@
>  #include <stdint.h>
>  #include <inttypes.h>
>  #include <string.h>
> +#include <unistd.h>
>  #include <assert.h>
>  #include <pthread.h>
>  
> +#ifdef HAVE_STDATOMIC_H
> +#include <stdatomic.h>
> +#else
> +/* Some old platforms lack atomic types, but 32 bit ints are usually
> + * "atomic enough".
> + */
> +#define _Atomic /**/
> +#endif
> +
>  #include <curl/curl.h>
>  
>  #include <nbdkit-plugin.h>
> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>  
>  unsigned connections = 4;
>  
> -/* This lock protects access to the curl_handles vector below. */
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> +/* Pipe used to notify background thread that a command is pending in
> + * the queue.  A pointer to the 'struct command' is sent over the
> + * pipe.
> + */
> +static int self_pipe[2] = { -1, -1 };
>  
> -/* List of curl handles.  This is allocated dynamically as more
> - * handles are requested.  Currently it does not shrink.  It may grow
> - * up to 'connections' in length.
> +/* The curl multi handle. */
> +static CURLM *multi;
> +
> +/* List of running easy handles.  We only need to maintain this so we
> + * can remove them from the multi handle when cleaning up.
>   */
>  DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
>  static curl_handle_list curl_handles = empty_vector;
>  
> -/* The condition is used when the curl handles vector is full and
> - * we're waiting for a thread to put_handle.
> - */
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static size_t in_use = 0, waiting = 0;
> +static const char *
> +command_type_to_string (enum command_type type)
> +{
> +  switch (type) {
> +  case EASY_HANDLE: return "EASY_HANDLE";
> +  case STOP:        return "STOP";
> +  default:          abort ();
> +  }
> +}
>  
>  int
>  pool_get_ready (void)
>  {
> +  multi = curl_multi_init ();
> +  if (multi == NULL) {
> +    nbdkit_error ("curl_multi_init failed: %m");
> +    return -1;
> +  }
> +
>    return 0;
>  }
>  
> +/* Start and stop the background thread. */
> +static pthread_t thread;
> +static bool thread_running;
> +static void *pool_worker (void *);
> +
>  int
>  pool_after_fork (void)
>  {
> +  int err;
> +
> +  if (pipe (self_pipe) == -1) {
> +    nbdkit_error ("pipe: %m");
> +    return -1;
> +  }
> +
> +  /* Start the pool background thread where all the curl work is done. */
> +  err = pthread_create (&thread, NULL, pool_worker, NULL);
> +  if (err != 0) {
> +    errno = err;
> +    nbdkit_error ("pthread_create: %m");
> +    return -1;
> +  }
> +  thread_running = true;
> +
>    return 0;
>  }
>  
> -/* Close and free all handles in the pool. */
> +/* Unload the background thread. */
>  void
>  pool_unload (void)
>  {
> -  size_t i;
> +  if (thread_running) {
> +    /* Stop the background thread. */
> +    struct command cmd = { .type = STOP };
> +    send_command_and_wait (&cmd);
> +    pthread_join (thread, NULL);
> +    thread_running = false;
> +  }
>  
> -  if (curl_debug_pool)
> -    nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
> -                  curl_handles.len);
> +  if (self_pipe[0] >= 0) {
> +    close (self_pipe[0]);
> +    self_pipe[0] = -1;
> +  }
> +  if (self_pipe[1] >= 0) {
> +    close (self_pipe[1]);
> +    self_pipe[1] = -1;
> +  }
>  
> -  for (i = 0; i < curl_handles.len; ++i)
> -    free_handle (curl_handles.ptr[i]);
> -  curl_handle_list_reset (&curl_handles);
> +  if (multi) {
> +    size_t i;
> +
> +    /* Remove and free any easy handles in the multi. */
> +    for (i = 0; i < curl_handles.len; ++i) {
> +      curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
> +      free_handle (curl_handles.ptr[i]);
> +    }
> +
> +    curl_multi_cleanup (multi);
> +    multi = NULL;
> +  }
>  }
>  
> -/* Get a handle from the pool.
> - *
> - * It is owned exclusively by the caller until they call put_handle.
> +/* Command queue. */
> +static _Atomic uint64_t id;     /* next command ID */
> +
> +/* Send command to the background thread and wait for completion.
> + * This is only called by one of the nbdkit threads.
>   */
> -struct curl_handle *
> -get_handle (void)
> +CURLcode
> +send_command_and_wait (struct command *cmd)
>  {
> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> -  size_t i;
> -  struct curl_handle *ch;
> -
> - again:
> -  /* Look for a handle which is not in_use. */
> -  for (i = 0; i < curl_handles.len; ++i) {
> -    ch = curl_handles.ptr[i];
> -    if (!ch->in_use) {
> -      ch->in_use = true;
> -      in_use++;
> +  cmd->id = id++;
> +
> +  /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
> +   * indicate that the command has not yet been completed and status
> +   * set.
> +   */
> +  cmd->status = -1;
> +
> +  /* This will be used to signal command completion back to us. */
> +  pthread_mutex_init (&cmd->mutex, NULL);
> +  pthread_cond_init (&cmd->cond, NULL);
> +
> +  /* Send the command to the background thread. */
> +  if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
> +    abort ();
> +
> +  /* Wait for the command to be completed by the background thread. */
> +  {
> +    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> +    while (cmd->status == -1) /* for -1, see above */
> +      pthread_cond_wait (&cmd->cond, &cmd->mutex);
> +  }
> +
> +  pthread_mutex_destroy (&cmd->mutex);
> +  pthread_cond_destroy (&cmd->cond);
> +
> +  /* Note the main thread must call nbdkit_error on error! */
> +  return cmd->status;
> +}
> +
> +/* The background thread. */
> +static void check_for_finished_handles (void);
> +static void retire_command (struct command *cmd, CURLcode code);
> +static void do_easy_handle (struct command *cmd);
> +
> +static void *
> +pool_worker (void *vp)
> +{
> +  bool stop = false;
> +
> +  if (curl_debug_pool)
> +    nbdkit_debug ("curl: background thread started");
> +
> +  while (!stop) {
> +    struct command *cmd = NULL;
> +    struct curl_waitfd extra_fds[1] =
> +      { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
> +    CURLMcode mc;
> +    int numfds, running_handles, repeats = 0;
> +
> +    do {
> +      /* Process the multi handle. */
> +      mc = curl_multi_perform (multi, &running_handles);
> +      if (mc != CURLM_OK) {
> +        nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));

Since nbdkit_error() stores its string in thread-local storage, is
there anything that ever extracts this error over to the nbdkit thread
that issued the original request to the worker thread?...

> +        abort (); /* XXX We don't expect this to happen */

...Then again, if we abort, it doesn't matter.

> +      }

> +
> +      check_for_finished_handles ();
> +
> +      mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
> +      if (mc != CURLM_OK) {
> +        nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
> +        abort (); /* XXX We don't expect this to happen */
> +      }
> +
>        if (curl_debug_pool)
> -        nbdkit_debug ("get_handle: %zu", ch->i);
> -      return ch;
> -    }
> -  }
> +        nbdkit_debug ("curl_multi_wait returned: running_handles=%d 
> numfds=%d",
> +                      running_handles, numfds);
> +
> +      if (numfds == 0) {
> +        repeats++;
> +        if (repeats > 1)
> +          nbdkit_nanosleep (1, 0);
> +      }
> +      else {
> +        repeats = 0;
> +        if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
> +          /* There's a command waiting. */
> +          if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
> +            abort ();
> +        }
> +      }
> +    } while (!cmd);
>  
> -  /* If more connections are allowed, then allocate a new handle. */
> -  if (curl_handles.len < connections) {
> -    ch = allocate_handle ();
> -    if (ch == NULL)
> -      return NULL;
> -    if (curl_handle_list_append (&curl_handles, ch) == -1) {
> -      free_handle (ch);
> -      return NULL;
> -    }
> -    ch->i = curl_handles.len - 1;
> -    ch->in_use = true;
> -    in_use++;
>      if (curl_debug_pool)
> -      nbdkit_debug ("get_handle: %zu", ch->i);
> -    return ch;
> -  }
> +      nbdkit_debug ("curl: dispatching %s command %" PRIu64,
> +                    command_type_to_string (cmd->type), cmd->id);
> +
> +    switch (cmd->type) {
> +    case STOP:
> +      stop = true;
> +      retire_command (cmd, CURLE_OK);
> +      break;
>  
> -  /* Otherwise we have run out of connections so we must wait until
> -   * another thread calls put_handle.
> -   */
> -  assert (in_use == connections);
> -  waiting++;
> -  while (in_use == connections)
> -    pthread_cond_wait (&cond, &lock);
> -  waiting--;
> +    case EASY_HANDLE:
> +      do_easy_handle (cmd);
> +      break;
> +    }
> +  } /* while (!stop) */
>  
> -  goto again;
> +  if (curl_debug_pool)
> +    nbdkit_debug ("curl: background thread stopped");
> +
> +  return NULL;
>  }
>  
> -/* Return the handle to the pool. */
> -void
> -put_handle (struct curl_handle *ch)
> +/* This checks if any easy handles in the multi have
> + * finished and retires the associated commands.
> + */
> +static void
> +check_for_finished_handles (void)
>  {
> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> +  CURLMsg *msg;
> +  int msgs_in_queue;
> +
> +  while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
> +    size_t i;
> +    struct curl_handle *ch = NULL;
> +
> +    if (msg->msg == CURLMSG_DONE) {
> +      /* Find this curl_handle. */
> +      for (i = 0; i < curl_handles.len; ++i) {
> +        if (curl_handles.ptr[i]->c == msg->easy_handle) {
> +          ch = curl_handles.ptr[i];
> +          curl_handle_list_remove (&curl_handles, i);
> +        }
> +      }
> +      if (ch == NULL) abort ();
> +      curl_multi_remove_handle (multi, ch->c);
> +
> +      retire_command (ch->cmd, msg->data.result);
> +    }
> +  }
> +}
>  
> +/* Retire a command.  status is a CURLcode. */
> +static void
> +retire_command (struct command *cmd, CURLcode status)
> +{
>    if (curl_debug_pool)
> -    nbdkit_debug ("put_handle: %zu", ch->i);
> +    nbdkit_debug ("curl: retiring %s command %" PRIu64,
> +                  command_type_to_string (cmd->type), cmd->id);
> +
> +  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> +  cmd->status = status;
> +  pthread_cond_signal (&cmd->cond);
> +}
> +
> +static void
> +do_easy_handle (struct command *cmd)
> +{
> +  CURLMcode mc;
> +
> +  cmd->ch->cmd = cmd;
> +
> +  /* Add the handle to the multi. */
> +  mc = curl_multi_add_handle (multi, cmd->ch->c);
> +  if (mc != CURLM_OK) {
> +    nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
> +    goto err;
> +  }
>  
> -  ch->in_use = false;
> -  in_use--;
> +  if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
> +    goto err;
> +  return;
>  
> -  /* Signal the next thread which is waiting. */
> -  if (waiting > 0)
> -    pthread_cond_signal (&cond);
> + err:
> +  retire_command (cmd, CURLE_OUT_OF_MEMORY);
>  }
> -- 
> 2.41.0

Overall looks nice, and I learned more about curl in the process.

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization:  qemu.org | libguestfs.org
_______________________________________________
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs

Reply via email to