On Mon, Aug 07, 2023 at 12:57:02PM +0200, Laszlo Ersek wrote:
> On 8/4/23 20:04, Richard W.M. Jones wrote:
> > On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote:
> >> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
> >>> See the comment at the top of plugins/curl/pool.c for general
> >>> information about how this works.
> >>>
> >>> This makes a very large difference to performance over the previous
> >>> implementation.  Note for the tests below I also applied the next
> >>> commit changing the behaviour of the connections parameter.
> >>>
> >>> Using this test case:
> >>>
> >>>   $ 
> >>> uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
> >>>   $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'
> >>>
> >>> The times are as follows:
> >>>
> >>>   multi, connections=64           21.5s
> >>>   multi, connections=32           30.2s
> >>>   multi, connections=16           56.0s
> >>>   before this commit             166s
> >>
> >> Awesome performance improvements!  As painful as this series has been
> >> for you to write and debug, it is showing its worth.
> >>
> >>> ---
> >>>  plugins/curl/curldefs.h |  35 ++--
> >>>  plugins/curl/config.c   | 246 ---------------------------
> >>>  plugins/curl/curl.c     | 366 +++++++++++++++++++++++++++++++++++-----
> >>>  plugins/curl/pool.c     | 346 ++++++++++++++++++++++++++++---------
> >>>  4 files changed, 616 insertions(+), 377 deletions(-)
> >>
> >> Finally taking time to review this, even though it is already in-tree.
> >>
> >>> @@ -98,8 +88,30 @@ struct curl_handle {
> >>>    const char *read_buf;
> >>>    uint32_t read_count;
> >>>  
> >>> +  /* This field is used by curl_get_size. */
> >>> +  bool accept_range;
> >>> +
> >>>    /* Used by scripts.c */
> >>>    struct curl_slist *headers_copy;
> >>> +
> >>> +  /* Used by pool.c */
> >>> +  struct command *cmd;
> >>> +};
> >>> +
> >>> +/* Asynchronous commands that can be sent to the pool thread. */
> >>> +enum command_type { EASY_HANDLE, STOP };
> >>> +struct command {
> >>> +  /* These fields are set by the caller. */
> >>> +  enum command_type type;       /* command */
> >>> +  struct curl_handle *ch;       /* for EASY_HANDLE, the easy handle */
> >>> +
> >>> +  /* This field is set to a unique value by send_command_and_wait. */
> >>> +  uint64_t id;                  /* serial number */
> >>> +
> >>> +  /* These fields are used to signal back that the command finished. */
> >>> +  pthread_mutex_t mutex;        /* completion mutex */
> >>> +  pthread_cond_t cond;          /* completion condition */
> >>> +  CURLcode status;              /* status code (CURLE_OK = succeeded) */
> >>>  };
> >>
> >> Makes sense.  The two types are mutually recursive (curl_handle
> >> includes a struct command *; command includes a struct curl_handle *);
> >> hopefully you have proper locking when altering multiple objects to
> >> adjust how they point to one another.
> > 
> > Actually locking is not needed.  Let me document it through ...
> > 
> > We create both the curl easy handle and the associated EASY_HANDLE
> > command in the nbdkit thread that gets the request, eg. in the curl
> > .pread method.  That of course requires no locking.
> > 
> > There is a single background worker thread.
> > 
> > A "self pipe" passes pointers to 'struct command *' to this worker
> > thread simple by writing the 8 byte pointer onto the pipe (hopefully
> > atomic ...)  The nbdkit request thread then blocks on the mutex/cond
> > in the command handle.
> 
> Right, that's exactly that I got curious about.
> 
> In my opinion, writing to the self-pipe is mostly safe. Reading from the
> self-pipe could be slightly polished.
> 
> Here are the POSIX pages on write() and read():
> 
> https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
> https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html
> 
> The first one (write) says that if you attempt to write at most PIPE_BUF
> bytes, then writes will not be interleaved with other concurrent writes,
> so in a sense the write will be atomic. (And in this case, O_NONBLOCK
> only changes the behavior for the case when the buffer cannot be written
> in entirety: with O_NONBLOCK clear, the writer thread will block, with
> O_NONBLOCK set, the writer thread will see -1/EAGAIN.)
> 
> Now, PIPE_BUF is "variable" in a sense:
> 
> https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html
> 
> but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our
> pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely
> satisfied.
> 
> ... The only case I see possible for a short write is the delivery of a
> signal after transfer has started. *If* nbdkit catches some signal such
> that the signal handler actually returns, then this could be a
> (theoretical) problem, calling for xwrite() or similar.
> 
> > 
> > The background worker thread picks the 'struct command *' up from the
> > pipe in the same event loop that it uses to process ongoing requests
> > on the multi handle.  It takes the easy handle and adds it to the
> > multi handle.
> 
> The spec on read() does not seem to have the same kind of
> non-interleaving / "atomicity" language that write() does. The rationale
> section says:
> 
> "The standard developers considered adding atomicity requirements to a
> pipe or FIFO, but recognized that due to the nature of pipes and FIFOs
> there could be no guarantee of atomicity of reads of {PIPE_BUF} or any
> other size that would be an aid to applications portability."
> 
> Now given that the writer side is free of interleaving (because, in the
> first place: there is only a single writer!), I think we need not worry
> about data corruption. However, it does feel like read() may return
> fewer than 8 bytes in one go, "just because" (not only because of a
> signal being delivered midway).

So there are actually multiple writer threads, but only a single
reader thread.  Also I should say that I didn't set O_NONBLOCK
(actually I completely forgot), but that might be a benefit in this
case since it makes it less likely for the 8 byte read to be broken up.

> And that may be a problem with readiness reporting via
> curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command
> pointer may not yet be available.
> 
> Now I do think a split read is extremely unlikely, maybe even impossible
> on Linux. If we choose to be pedantic, then the curl_multi_wait() loop
> might want to expect "cmd" to be populated only over multiple iterations
> -- like use "cmd_ptr_bytes" or something similar for tracking the size
> already available, and only consider "cmd" usable when cmd_ptr_bytes
> reaches sizeof cmd.
>
> 
> Yet another topic that comes up is visibility / ordering. Transfering a
> pointer via write()/read() between threads does not seem to guarantee
> ordering / visibility regarding the *pointed-to* area per spec. Pthread
> mutex APIs (and C11 thread and atomics APIs) include the CPU
> instructions for ensuring proper memory visibility.
> 
> *BUT* I think it would be insane for any POSIX implementation to have a
> write()+read() combination that's *weaker* regarding data consistency,
> (i.e., that's more racy) than mutexes. Write() and read() are
> heavy-weight syscalls, so I can't imagine a publish-subscribe pattern
> not working with them (i.e., the pointed-to area not "settling" until
> the pointer is consumed).

Yup, it seems unlikely, but maybe it could happen on architectures
with weaker store ordering like Arm?  I wonder if adding a memory
barrier before the write would be a good idea?

Thanks,

Rich.

> Laszlo
> 
> > 
> > When the easy handle work has finished, the worker thread removes it
> > from the multi handle and signals the nbdkit request thread to wake up
> > (using cmd->mutex + cmd->lock).  At which point possession passes back
> > to the request thread which will usually free up both the command and
> > easy handle.
> > 
> >>> +++ b/plugins/curl/config.c
> >>
> >>> +++ b/plugins/curl/curl.c
> >>>  
> >>> +/* Get the file size. */
> >>> +static int get_content_length_accept_range (struct curl_handle *ch);
> >>> +static bool try_fallback_GET_method (struct curl_handle *ch);
> >>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void 
> >>> *opaque);
> >>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void 
> >>> *opaque);
> >>> +
> >>> +static int64_t
> >>> +curl_get_size (void *handle)
> >>> +{
> >>> +  struct curl_handle *ch;
> >>> +  CURLcode r;
> >>> +  long code;
> >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> >>> +  curl_off_t o;
> >>> +#else
> >>> +  double d;
> >>> +#endif
> >>> +  int64_t exportsize;
> >>> +
> >>> +  /* Get a curl easy handle. */
> >>> +  ch = allocate_handle ();
> >>> +  if (ch == NULL) goto err;
> >>> +
> >>> +  /* Prepare to read the headers. */
> >>> +  if (get_content_length_accept_range (ch) == -1)
> >>> +    goto err;
> >>> +
> >>> +  /* Send the command to the worker thread and wait. */
> >>> +  struct command cmd = {
> >>> +    .type = EASY_HANDLE,
> >>> +    .ch = ch,
> >>> +  };
> >>> +
> >>> +  r = send_command_and_wait (&cmd);
> >>> +  update_times (ch->c);
> >>> +  if (r != CURLE_OK) {
> >>> +    display_curl_error (ch, r,
> >>> +                        "problem doing HEAD request to fetch size of URL 
> >>> [%s]",
> >>> +                        url);
> >>> +
> >>> +    /* Get the HTTP status code, if available. */
> >>> +    r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
> >>> +    if (r == CURLE_OK)
> >>> +      nbdkit_debug ("HTTP status code: %ld", code);
> >>> +    else
> >>> +      code = -1;
> >>> +
> >>> +    /* See comment on try_fallback_GET_method below. */
> >>> +    if (code != 403 || !try_fallback_GET_method (ch))
> >>> +      goto err;
> >>> +  }
> >>> +
> >>> +  /* Get the content length.
> >>> +   *
> >>> +   * Note there is some subtlety here: For web servers using chunked
> >>> +   * encoding, either the Content-Length header will not be present,
> >>> +   * or if present it should be ignored.  (For such servers the only
> >>> +   * way to find out the true length would be to read all of the
> >>> +   * content, which we don't want to do).
> >>> +   *
> >>> +   * Curl itself resolves this for us.  It will ignore the
> >>> +   * Content-Length header if chunked encoding is used, returning the
> >>> +   * length as -1 which we check below (see also
> >>> +   * curl:lib/http.c:Curl_http_size).
> >>> +   */
> >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
> >>> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
> >>> +  if (r != CURLE_OK) {
> >>> +    display_curl_error (ch, r,
> >>> +                        "could not get length of remote file [%s]", url);
> >>> +    goto err;
> >>> +  }
> >>> +
> >>> +  if (o == -1) {
> >>> +    nbdkit_error ("could not get length of remote file [%s], "
> >>> +                  "is the URL correct?", url);
> >>> +    goto err;
> >>> +  }
> >>> +
> >>> +  exportsize = o;
> >>> +#else
> >>> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
> >>> +  if (r != CURLE_OK) {
> >>> +    display_curl_error (ch, r,
> >>> +                        "could not get length of remote file [%s]", url);
> >>> +    goto err;
> >>> +  }
> >>> +
> >>> +  if (d == -1) {
> >>> +    nbdkit_error ("could not get length of remote file [%s], "
> >>> +                  "is the URL correct?", url);
> >>> +    goto err;
> >>> +  }
> >>> +
> >>> +  exportsize = d;
> >>
> >> Does curl guarantee that the double d will contain a value assignable
> >> to int64_t without overflow/truncation?  For particularly large sizes,
> >> double has insufficient precision for all possible file sizes, but I
> >> doubt someone is exposing such large files over HTTP.
> > 
> > No, I don't believe a 'double' is sufficient.  This is why newer
> > versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T.  Note
> > this code is just copied from the old curl plugin.
> > 
> >>> +#endif
> >>> +  nbdkit_debug ("content length: %" PRIi64, exportsize);
> >>> +
> >>> +  /* If this is HTTP, check that byte ranges are supported. */
> >>> +  if (ascii_strncasecmp (url, "http://";, strlen ("http://";)) == 0 ||
> >>> +      ascii_strncasecmp (url, "https://";, strlen ("https://";)) == 0) {
> >>> +    if (!ch->accept_range) {
> >>> +      nbdkit_error ("server does not support 'range' (byte range) 
> >>> requests");
> >>> +      goto err;
> >>> +    }
> >>> +
> >>> +    nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
> >>> +  }
> >>> +
> >>> +  free_handle (ch);
> >>> +  return exportsize;
> >>> +
> >>> + err:
> >>> +  if (ch)
> >>> +    free_handle (ch);
> >>> +  return -1;
> >>> +}
> >>> +
> >>> +/* Get the file size and also whether the remote HTTP server
> >>> + * supports byte ranges.
> >>> + */
> >>> +static int
> >>> +get_content_length_accept_range (struct curl_handle *ch)
> >>> +{
> >>> +  /* We must run the scripts if necessary and set headers in the
> >>> +   * handle.
> >>> +   */
> >>> +  if (do_scripts (ch) == -1)
> >>> +    return -1;
> >>> +
> >>> +  /* Set this flag in the handle to false.  The callback should set it
> >>> +   * to true if byte ranges are supported, which we check below.
> >>> +   */
> >>> +  ch->accept_range = false;
> >>> +
> >>> +  /* No Body, not nobody!  This forces a HEAD request. */
> >>> +  curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
> >>> +  return 0;
> >>> +}
> >>> +
> >>> +/* S3 servers can return 403 Forbidden for HEAD but still respond
> >>> + * to GET, so we give it a second chance in that case.
> >>> + * https://github.com/kubevirt/containerized-data-importer/issues/2737
> >>> + *
> >>> + * This function issues a GET request with a writefunction that always
> >>> + * returns an error, thus effectively getting the headers but
> >>> + * abandoning the transfer as soon as possible after.
> >>> + */
> >>> +static bool
> >>> +try_fallback_GET_method (struct curl_handle *ch)
> >>> +{
> >>> +  CURLcode r;
> >>> +
> >>> +  nbdkit_debug ("attempting to fetch headers using GET method");
> >>> +
> >>> +  curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
> >>> +
> >>> +  struct command cmd = {
> >>> +    .type = EASY_HANDLE,
> >>> +    .ch = ch,
> >>> +  };
> >>> +
> >>> +  r = send_command_and_wait (&cmd);
> >>> +  update_times (ch->c);
> >>> +
> >>> +  /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
> >>> +   * (eg if the remote has zero length).  Other errors might happen
> >>> +   * but we ignore them since it is a fallback path.
> >>> +   */
> >>> +  return r == CURLE_OK || r == CURLE_WRITE_ERROR;
> >>> +}
> >>> +
> >>> +static size_t
> >>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> >>> +{
> >>> +  struct curl_handle *ch = opaque;
> >>> +  size_t realsize = size * nmemb;
> >>> +  const char *header = ptr;
> >>> +  const char *end = header + realsize;
> >>> +  const char *accept_ranges = "accept-ranges:";
> >>> +  const char *bytes = "bytes";
> >>> +
> >>> +  if (realsize >= strlen (accept_ranges) &&
> >>> +      ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) 
> >>> == 0) {
> >>> +    const char *p = strchr (header, ':') + 1;
> >>> +
> >>> +    /* Skip whitespace between the header name and value. */
> >>> +    while (p < end && *p && ascii_isspace (*p))
> >>
> >> Technically, '*p && ascii_isspace (*p)' can be shortened to
> >> 'ascii_isspace (*p)', since the NUL byte is not ascii space.  I don't
> >> know if the compiler is smart enough to make that optimization on your
> >> behalf.
> > 
> > Ah indeed.
> > 
> >>> +      p++;
> >>> +
> >>> +    if (end - p >= strlen (bytes)
> >>> +        && strncmp (p, bytes, strlen (bytes)) == 0) {
> >>> +      /* Check that there is nothing but whitespace after the value. */
> >>> +      p += strlen (bytes);
> >>> +      while (p < end && *p && ascii_isspace (*p))
> >>
> >> Another spot of the same.
> >>
> >>> +        p++;
> >>> +
> >>> +      if (p == end || !*p)
> >>> +        ch->accept_range = true;
> >>> +    }
> >>> +  }
> >>> +
> >>> +  return realsize;
> >>> +}
> >>> +
> >>> +static size_t
> >>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> >>> +{
> >>> +#ifdef CURL_WRITEFUNC_ERROR
> >>> +  return CURL_WRITEFUNC_ERROR;
> >>> +#else
> >>> +  return 0; /* in older curl, any size < requested will also be an error 
> >>> */
> >>> +#endif
> >>> +}
> >>> +
> >>>  /* Read data from the remote server. */
> >>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void 
> >>> *opaque);
> >>> +
> >>>  static int
> >>>  curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
> >>>  {
> >>>    CURLcode r;
> >>> +  struct curl_handle *ch;
> >>>    char range[128];
> >>>  
> >>> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> >>> -  if (ch == NULL)
> >>> -    return -1;
> >>> +  /* Get a curl easy handle. */
> >>> +  ch = allocate_handle ();
> >>> +  if (ch == NULL) goto err;
> >>>  
> >>>    /* Run the scripts if necessary and set headers in the handle. */
> >>> -  if (do_scripts (ch) == -1) return -1;
> >>> +  if (do_scripts (ch) == -1) goto err;
> >>>  
> >>>    /* Tell the write_cb where we want the data to be written.  write_cb
> >>>     * will update this if the data comes in multiple sections.
> >>>     */
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
> >>>    ch->write_buf = buf;
> >>>    ch->write_count = count;
> >>>  
> >>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t 
> >>> count, uint64_t offset)
> >>>              offset, offset + count);
> >>>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
> >>>  
> >>> -  /* The assumption here is that curl will look after timeouts. */
> >>> -  r = curl_easy_perform (ch->c);
> >>> +  /* Send the command to the worker thread and wait. */
> >>> +  struct command cmd = {
> >>> +    .type = EASY_HANDLE,
> >>> +    .ch = ch,
> >>> +  };
> >>> +
> >>> +  r = send_command_and_wait (&cmd);
> >>>    if (r != CURLE_OK) {
> >>> -    display_curl_error (ch, r, "pread: curl_easy_perform");
> >>> -    return -1;
> >>> +    display_curl_error (ch, r, "pread");
> >>> +    goto err;
> >>>    }
> >>>    update_times (ch->c);
> >>>  
> >>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t 
> >>> count, uint64_t offset)
> >>>    /* As far as I understand the cURL API, this should never happen. */
> >>>    assert (ch->write_count == 0);
> >>>  
> >>> +  free_handle (ch);
> >>>    return 0;
> >>> +
> >>> + err:
> >>> +  if (ch)
> >>> +    free_handle (ch);
> >>> +  return -1;
> >>> +}
> >>> +
> >>> +/* NB: The terminology used by libcurl is confusing!
> >>> + *
> >>> + * WRITEFUNCTION / write_cb is used when reading from the remote server
> >>> + * READFUNCTION / read_cb is used when writing to the remote server.
> >>> + *
> >>> + * We use the same terminology as libcurl here.
> >>> + */
> >>> +static size_t
> >>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
> >>> +{
> >>> +  struct curl_handle *ch = opaque;
> >>> +  size_t orig_realsize = size * nmemb;
> >>> +  size_t realsize = orig_realsize;
> >>
> >> Do we have to worry about overflow when compiling on 32-bit machines?
> >> Asked differently, should we be using off_t instead of size_t in any
> >> of this code?  Thankfully, for now, we know NBD .pread and .pwrite
> >> requests are capped at 64M, so I think you're okay (we aren't ever
> >> going to ask curl for gigabytes in one request),
> > 
> > It's a good question ...  I suspect that even if we requested it, web
> > servers probably wouldn't want to serve gigabytes of data in a range
> > request, but as you point out we shouldn't ever request it right now.
> > 
> >> but maybe a comment
> >> or assert() is worth it?
> > 
> > I'll add a comment, but could we do this with one of the overflow
> > macros?  I'm not sure ...
> > 
> >>> +
> >>> +  assert (ch->write_buf);
> >>> +
> >>> +  /* Don't read more than the requested amount of data, even if the
> >>> +   * server or libcurl sends more.
> >>> +   */
> >>> +  if (realsize > ch->write_count)
> >>> +    realsize = ch->write_count;
> >>> +
> >>> +  memcpy (ch->write_buf, ptr, realsize);
> >>> +
> >>> +  ch->write_count -= realsize;
> >>> +  ch->write_buf += realsize;
> >>> +
> >>> +  return orig_realsize;
> >>
> >> [1]
> >>
> >>>  }
> >>>  
> >>>  /* Write data to the remote server. */
> >>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void 
> >>> *opaque);
> >>> +
> >>>  static int
> >>>  curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t 
> >>> offset)
> >>>  {
> >>>    CURLcode r;
> >>> +  struct curl_handle *ch;
> >>>    char range[128];
> >>>  
> >>> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
> >>> -  if (ch == NULL)
> >>> -    return -1;
> >>> +  /* Get a curl easy handle. */
> >>> +  ch = allocate_handle ();
> >>> +  if (ch == NULL) goto err;
> >>>  
> >>>    /* Run the scripts if necessary and set headers in the handle. */
> >>> -  if (do_scripts (ch) == -1) return -1;
> >>> +  if (do_scripts (ch) == -1) goto err;
> >>>  
> >>>    /* Tell the read_cb where we want the data to be read from.  read_cb
> >>>     * will update this if the data comes in multiple sections.
> >>>     */
> >>> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
> >>> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
> >>>    ch->read_buf = buf;
> >>>    ch->read_count = count;
> >>>  
> >>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, 
> >>> uint32_t count, uint64_t offset)
> >>>              offset, offset + count);
> >>>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
> >>>  
> >>> -  /* The assumption here is that curl will look after timeouts. */
> >>> -  r = curl_easy_perform (ch->c);
> >>> +  /* Send the command to the worker thread and wait. */
> >>> +  struct command cmd = {
> >>> +    .type = EASY_HANDLE,
> >>> +    .ch = ch,
> >>> +  };
> >>> +
> >>> +  r = send_command_and_wait (&cmd);
> >>>    if (r != CURLE_OK) {
> >>> -    display_curl_error (ch, r, "pwrite: curl_easy_perform");
> >>> -    return -1;
> >>> +    display_curl_error (ch, r, "pwrite");
> >>> +    goto err;
> >>>    }
> >>>    update_times (ch->c);
> >>>  
> >>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t 
> >>> count, uint64_t offset)
> >>>    /* As far as I understand the cURL API, this should never happen. */
> >>>    assert (ch->read_count == 0);
> >>>  
> >>> +  free_handle (ch);
> >>>    return 0;
> >>> +
> >>> + err:
> >>> +  if (ch)
> >>> +    free_handle (ch);
> >>> +  return -1;
> >>> +}
> >>> +
> >>> +static size_t
> >>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
> >>> +{
> >>> +  struct curl_handle *ch = opaque;
> >>> +  size_t realsize = size * nmemb;
> >>> +
> >>> +  assert (ch->read_buf);
> >>> +  if (realsize > ch->read_count)
> >>> +    realsize = ch->read_count;
> >>> +
> >>> +  memcpy (ptr, ch->read_buf, realsize);
> >>> +
> >>> +  ch->read_count -= realsize;
> >>> +  ch->read_buf += realsize;
> >>> +
> >>> +  return realsize;
> >>
> >> Why does write_cb in [1] above return orig_realsize, but read_cb
> >> returns the potentially modified realsize?
> > 
> > It's a good question (this code was copied from the old plugin which
> > was working for years).  It definitely works now.  Note that writes in
> > this plugin are probably never used.  They require a web server that
> > supports "Range PUTs" which is, probably, not any server in existence.
> > 
> >>>  }
> >>>  
> >>>  static struct nbdkit_plugin plugin = {
> >>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
> >>> index eb2d330e1..2974cda3f 100644
> >>> --- a/plugins/curl/pool.c
> >>> +++ b/plugins/curl/pool.c
> >>> @@ -30,11 +30,29 @@
> >>>   * SUCH DAMAGE.
> >>>   */
> >>>  
> >>> -/* Curl handle pool.
> >>> +/* Worker thread which processes the curl multi interface.
> >>>   *
> >>> - * To get a libcurl handle, call get_handle().  When you hold the
> >>> - * handle, it is yours exclusively to use.  After you have finished
> >>> - * with the handle, put it back into the pool by calling put_handle().
> >>> + * The main nbdkit threads (see curl.c) create curl easy handles
> >>> + * initialized with the work they want to carry out.  Note there is
> >>> + * one easy handle per task (eg. per pread/pwrite request).  The easy
> >>> + * handles are not reused.
> >>> + *
> >>> + * The commands + optional easy handle are submitted to the worker
> >>> + * thread over a self-pipe (it's easy to use a pipe here because the
> >>> + * way curl multi works is it can listen on an extra fd, but not on
> >>> + * anything else like a pthread condition).  The curl multi performs
> >>> + * the work of the outstanding easy handles.
> >>> + *
> >>> + * When an easy handle finishes work or errors, we retire the command
> >>> + * by signalling back to the waiting nbdkit thread using a pthread
> >>> + * condition.
> >>> + *
> >>> + * In my experiments, we're almost always I/O bound so I haven't seen
> >>> + * any strong need to use more than one curl multi / worker thread,
> >>> + * although it would be possible to add more in future.
> >>> + *
> >>> + * See also this extremely useful thread:
> >>> + * https://curl.se/mail/lib-2019-03/0100.html
> >>
> >> Very useful comment (and link).
> >>
> >>>   */
> >>>  
> >>>  #include <config.h>
> >>> @@ -45,9 +63,19 @@
> >>>  #include <stdint.h>
> >>>  #include <inttypes.h>
> >>>  #include <string.h>
> >>> +#include <unistd.h>
> >>>  #include <assert.h>
> >>>  #include <pthread.h>
> >>>  
> >>> +#ifdef HAVE_STDATOMIC_H
> >>> +#include <stdatomic.h>
> >>> +#else
> >>> +/* Some old platforms lack atomic types, but 32 bit ints are usually
> >>> + * "atomic enough".
> >>> + */
> >>> +#define _Atomic /**/
> >>> +#endif
> >>> +
> >>>  #include <curl/curl.h>
> >>>  
> >>>  #include <nbdkit-plugin.h>
> >>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
> >>>  
> >>>  unsigned connections = 4;
> >>>  
> >>> -/* This lock protects access to the curl_handles vector below. */
> >>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> >>> +/* Pipe used to notify background thread that a command is pending in
> >>> + * the queue.  A pointer to the 'struct command' is sent over the
> >>> + * pipe.
> >>> + */
> >>> +static int self_pipe[2] = { -1, -1 };
> >>>  
> >>> -/* List of curl handles.  This is allocated dynamically as more
> >>> - * handles are requested.  Currently it does not shrink.  It may grow
> >>> - * up to 'connections' in length.
> >>> +/* The curl multi handle. */
> >>> +static CURLM *multi;
> >>> +
> >>> +/* List of running easy handles.  We only need to maintain this so we
> >>> + * can remove them from the multi handle when cleaning up.
> >>>   */
> >>>  DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
> >>>  static curl_handle_list curl_handles = empty_vector;
> >>>  
> >>> -/* The condition is used when the curl handles vector is full and
> >>> - * we're waiting for a thread to put_handle.
> >>> - */
> >>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> >>> -static size_t in_use = 0, waiting = 0;
> >>> +static const char *
> >>> +command_type_to_string (enum command_type type)
> >>> +{
> >>> +  switch (type) {
> >>> +  case EASY_HANDLE: return "EASY_HANDLE";
> >>> +  case STOP:        return "STOP";
> >>> +  default:          abort ();
> >>> +  }
> >>> +}
> >>>  
> >>>  int
> >>>  pool_get_ready (void)
> >>>  {
> >>> +  multi = curl_multi_init ();
> >>> +  if (multi == NULL) {
> >>> +    nbdkit_error ("curl_multi_init failed: %m");
> >>> +    return -1;
> >>> +  }
> >>> +
> >>>    return 0;
> >>>  }
> >>>  
> >>> +/* Start and stop the background thread. */
> >>> +static pthread_t thread;
> >>> +static bool thread_running;
> >>> +static void *pool_worker (void *);
> >>> +
> >>>  int
> >>>  pool_after_fork (void)
> >>>  {
> >>> +  int err;
> >>> +
> >>> +  if (pipe (self_pipe) == -1) {
> >>> +    nbdkit_error ("pipe: %m");
> >>> +    return -1;
> >>> +  }
> >>> +
> >>> +  /* Start the pool background thread where all the curl work is done. */
> >>> +  err = pthread_create (&thread, NULL, pool_worker, NULL);
> >>> +  if (err != 0) {
> >>> +    errno = err;
> >>> +    nbdkit_error ("pthread_create: %m");
> >>> +    return -1;
> >>> +  }
> >>> +  thread_running = true;
> >>> +
> >>>    return 0;
> >>>  }
> >>>  
> >>> -/* Close and free all handles in the pool. */
> >>> +/* Unload the background thread. */
> >>>  void
> >>>  pool_unload (void)
> >>>  {
> >>> -  size_t i;
> >>> +  if (thread_running) {
> >>> +    /* Stop the background thread. */
> >>> +    struct command cmd = { .type = STOP };
> >>> +    send_command_and_wait (&cmd);
> >>> +    pthread_join (thread, NULL);
> >>> +    thread_running = false;
> >>> +  }
> >>>  
> >>> -  if (curl_debug_pool)
> >>> -    nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
> >>> -                  curl_handles.len);
> >>> +  if (self_pipe[0] >= 0) {
> >>> +    close (self_pipe[0]);
> >>> +    self_pipe[0] = -1;
> >>> +  }
> >>> +  if (self_pipe[1] >= 0) {
> >>> +    close (self_pipe[1]);
> >>> +    self_pipe[1] = -1;
> >>> +  }
> >>>  
> >>> -  for (i = 0; i < curl_handles.len; ++i)
> >>> -    free_handle (curl_handles.ptr[i]);
> >>> -  curl_handle_list_reset (&curl_handles);
> >>> +  if (multi) {
> >>> +    size_t i;
> >>> +
> >>> +    /* Remove and free any easy handles in the multi. */
> >>> +    for (i = 0; i < curl_handles.len; ++i) {
> >>> +      curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
> >>> +      free_handle (curl_handles.ptr[i]);
> >>> +    }
> >>> +
> >>> +    curl_multi_cleanup (multi);
> >>> +    multi = NULL;
> >>> +  }
> >>>  }
> >>>  
> >>> -/* Get a handle from the pool.
> >>> - *
> >>> - * It is owned exclusively by the caller until they call put_handle.
> >>> +/* Command queue. */
> >>> +static _Atomic uint64_t id;     /* next command ID */
> >>> +
> >>> +/* Send command to the background thread and wait for completion.
> >>> + * This is only called by one of the nbdkit threads.
> >>>   */
> >>> -struct curl_handle *
> >>> -get_handle (void)
> >>> +CURLcode
> >>> +send_command_and_wait (struct command *cmd)
> >>>  {
> >>> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> >>> -  size_t i;
> >>> -  struct curl_handle *ch;
> >>> -
> >>> - again:
> >>> -  /* Look for a handle which is not in_use. */
> >>> -  for (i = 0; i < curl_handles.len; ++i) {
> >>> -    ch = curl_handles.ptr[i];
> >>> -    if (!ch->in_use) {
> >>> -      ch->in_use = true;
> >>> -      in_use++;
> >>> +  cmd->id = id++;
> >>> +
> >>> +  /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
> >>> +   * indicate that the command has not yet been completed and status
> >>> +   * set.
> >>> +   */
> >>> +  cmd->status = -1;
> >>> +
> >>> +  /* This will be used to signal command completion back to us. */
> >>> +  pthread_mutex_init (&cmd->mutex, NULL);
> >>> +  pthread_cond_init (&cmd->cond, NULL);
> >>> +
> >>> +  /* Send the command to the background thread. */
> >>> +  if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
> >>> +    abort ();
> >>> +
> >>> +  /* Wait for the command to be completed by the background thread. */
> >>> +  {
> >>> +    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> >>> +    while (cmd->status == -1) /* for -1, see above */
> >>> +      pthread_cond_wait (&cmd->cond, &cmd->mutex);
> >>> +  }
> >>> +
> >>> +  pthread_mutex_destroy (&cmd->mutex);
> >>> +  pthread_cond_destroy (&cmd->cond);
> >>> +
> >>> +  /* Note the main thread must call nbdkit_error on error! */
> >>> +  return cmd->status;
> >>> +}
> >>> +
> >>> +/* The background thread. */
> >>> +static void check_for_finished_handles (void);
> >>> +static void retire_command (struct command *cmd, CURLcode code);
> >>> +static void do_easy_handle (struct command *cmd);
> >>> +
> >>> +static void *
> >>> +pool_worker (void *vp)
> >>> +{
> >>> +  bool stop = false;
> >>> +
> >>> +  if (curl_debug_pool)
> >>> +    nbdkit_debug ("curl: background thread started");
> >>> +
> >>> +  while (!stop) {
> >>> +    struct command *cmd = NULL;
> >>> +    struct curl_waitfd extra_fds[1] =
> >>> +      { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
> >>> +    CURLMcode mc;
> >>> +    int numfds, running_handles, repeats = 0;
> >>> +
> >>> +    do {
> >>> +      /* Process the multi handle. */
> >>> +      mc = curl_multi_perform (multi, &running_handles);
> >>> +      if (mc != CURLM_OK) {
> >>> +        nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror 
> >>> (mc));
> >>
> >> Since nbdkit_error() stores its string in thread-local storage, is
> >> there anything that ever extracts this error over to the nbdkit thread
> >> that issued the original request to the worker thread?...
> >>
> >>> +        abort (); /* XXX We don't expect this to happen */
> >>
> >> ...Then again, if we abort, it doesn't matter.
> > 
> > I was unclear what to do here.  The final code does:
> > 
> > while (!stop) {
> > ...
> >     cmd = process_multi_handle ();
> >     if (cmd == NULL)
> >       continue; /* or die?? */
> > 
> > with process_multi_handle still calling nbdkit_error.  I felt it might
> > be better to keep trying than just abort, as presumably some (most?)
> > errors are transient.
> > 
> > nbdkit_error will ensure the error message is written out.
> > 
> >>> +      }
> >>
> >>> +
> >>> +      check_for_finished_handles ();
> >>> +
> >>> +      mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
> >>> +      if (mc != CURLM_OK) {
> >>> +        nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
> >>> +        abort (); /* XXX We don't expect this to happen */
> >>> +      }
> >>> +
> >>>        if (curl_debug_pool)
> >>> -        nbdkit_debug ("get_handle: %zu", ch->i);
> >>> -      return ch;
> >>> -    }
> >>> -  }
> >>> +        nbdkit_debug ("curl_multi_wait returned: running_handles=%d 
> >>> numfds=%d",
> >>> +                      running_handles, numfds);
> >>> +
> >>> +      if (numfds == 0) {
> >>> +        repeats++;
> >>> +        if (repeats > 1)
> >>> +          nbdkit_nanosleep (1, 0);
> >>> +      }
> >>> +      else {
> >>> +        repeats = 0;
> >>> +        if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
> >>> +          /* There's a command waiting. */
> >>> +          if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
> >>> +            abort ();
> >>> +        }
> >>> +      }
> >>> +    } while (!cmd);
> >>>  
> >>> -  /* If more connections are allowed, then allocate a new handle. */
> >>> -  if (curl_handles.len < connections) {
> >>> -    ch = allocate_handle ();
> >>> -    if (ch == NULL)
> >>> -      return NULL;
> >>> -    if (curl_handle_list_append (&curl_handles, ch) == -1) {
> >>> -      free_handle (ch);
> >>> -      return NULL;
> >>> -    }
> >>> -    ch->i = curl_handles.len - 1;
> >>> -    ch->in_use = true;
> >>> -    in_use++;
> >>>      if (curl_debug_pool)
> >>> -      nbdkit_debug ("get_handle: %zu", ch->i);
> >>> -    return ch;
> >>> -  }
> >>> +      nbdkit_debug ("curl: dispatching %s command %" PRIu64,
> >>> +                    command_type_to_string (cmd->type), cmd->id);
> >>> +
> >>> +    switch (cmd->type) {
> >>> +    case STOP:
> >>> +      stop = true;
> >>> +      retire_command (cmd, CURLE_OK);
> >>> +      break;
> >>>  
> >>> -  /* Otherwise we have run out of connections so we must wait until
> >>> -   * another thread calls put_handle.
> >>> -   */
> >>> -  assert (in_use == connections);
> >>> -  waiting++;
> >>> -  while (in_use == connections)
> >>> -    pthread_cond_wait (&cond, &lock);
> >>> -  waiting--;
> >>> +    case EASY_HANDLE:
> >>> +      do_easy_handle (cmd);
> >>> +      break;
> >>> +    }
> >>> +  } /* while (!stop) */
> >>>  
> >>> -  goto again;
> >>> +  if (curl_debug_pool)
> >>> +    nbdkit_debug ("curl: background thread stopped");
> >>> +
> >>> +  return NULL;
> >>>  }
> >>>  
> >>> -/* Return the handle to the pool. */
> >>> -void
> >>> -put_handle (struct curl_handle *ch)
> >>> +/* This checks if any easy handles in the multi have
> >>> + * finished and retires the associated commands.
> >>> + */
> >>> +static void
> >>> +check_for_finished_handles (void)
> >>>  {
> >>> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
> >>> +  CURLMsg *msg;
> >>> +  int msgs_in_queue;
> >>> +
> >>> +  while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
> >>> +    size_t i;
> >>> +    struct curl_handle *ch = NULL;
> >>> +
> >>> +    if (msg->msg == CURLMSG_DONE) {
> >>> +      /* Find this curl_handle. */
> >>> +      for (i = 0; i < curl_handles.len; ++i) {
> >>> +        if (curl_handles.ptr[i]->c == msg->easy_handle) {
> >>> +          ch = curl_handles.ptr[i];
> >>> +          curl_handle_list_remove (&curl_handles, i);
> >>> +        }
> >>> +      }
> >>> +      if (ch == NULL) abort ();
> >>> +      curl_multi_remove_handle (multi, ch->c);
> >>> +
> >>> +      retire_command (ch->cmd, msg->data.result);
> >>> +    }
> >>> +  }
> >>> +}
> >>>  
> >>> +/* Retire a command.  status is a CURLcode. */
> >>> +static void
> >>> +retire_command (struct command *cmd, CURLcode status)
> >>> +{
> >>>    if (curl_debug_pool)
> >>> -    nbdkit_debug ("put_handle: %zu", ch->i);
> >>> +    nbdkit_debug ("curl: retiring %s command %" PRIu64,
> >>> +                  command_type_to_string (cmd->type), cmd->id);
> >>> +
> >>> +  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> >>> +  cmd->status = status;
> >>> +  pthread_cond_signal (&cmd->cond);
> >>> +}
> >>> +
> >>> +static void
> >>> +do_easy_handle (struct command *cmd)
> >>> +{
> >>> +  CURLMcode mc;
> >>> +
> >>> +  cmd->ch->cmd = cmd;
> >>> +
> >>> +  /* Add the handle to the multi. */
> >>> +  mc = curl_multi_add_handle (multi, cmd->ch->c);
> >>> +  if (mc != CURLM_OK) {
> >>> +    nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
> >>> +    goto err;
> >>> +  }
> >>>  
> >>> -  ch->in_use = false;
> >>> -  in_use--;
> >>> +  if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
> >>> +    goto err;
> >>> +  return;
> >>>  
> >>> -  /* Signal the next thread which is waiting. */
> >>> -  if (waiting > 0)
> >>> -    pthread_cond_signal (&cond);
> >>> + err:
> >>> +  retire_command (cmd, CURLE_OUT_OF_MEMORY);
> >>>  }
> >>> -- 
> >>> 2.41.0
> >>
> >> Overall looks nice, and I learned more about curl in the process.
> > 
> > For me, too much :-(
> > 
> > Thanks,
> > 
> > Rich.
> > 

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
Fedora Windows cross-compiler. Compile Windows programs, test, and
build Windows installers. Over 100 libraries supported.
http://fedoraproject.org/wiki/MinGW
_______________________________________________
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs


Reply via email to