On 8/4/23 20:04, Richard W.M. Jones wrote:
> On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote:
>> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:
>>> See the comment at the top of plugins/curl/pool.c for general
>>> information about how this works.
>>>
>>> This makes a very large difference to performance over the previous
>>> implementation.  Note for the tests below I also applied the next
>>> commit changing the behaviour of the connections parameter.
>>>
>>> Using this test case:
>>>
>>>   $ 
>>> uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
>>>   $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'
>>>
>>> The times are as follows:
>>>
>>>   multi, connections=64           21.5s
>>>   multi, connections=32           30.2s
>>>   multi, connections=16           56.0s
>>>   before this commit             166s
>>
>> Awesome performance improvements!  As painful as this series has been
>> for you to write and debug, it is showing its worth.
>>
>>> ---
>>>  plugins/curl/curldefs.h |  35 ++--
>>>  plugins/curl/config.c   | 246 ---------------------------
>>>  plugins/curl/curl.c     | 366 +++++++++++++++++++++++++++++++++++-----
>>>  plugins/curl/pool.c     | 346 ++++++++++++++++++++++++++++---------
>>>  4 files changed, 616 insertions(+), 377 deletions(-)
>>
>> Finally taking time to review this, even though it is already in-tree.
>>
>>> @@ -98,8 +88,30 @@ struct curl_handle {
>>>    const char *read_buf;
>>>    uint32_t read_count;
>>>  
>>> +  /* This field is used by curl_get_size. */
>>> +  bool accept_range;
>>> +
>>>    /* Used by scripts.c */
>>>    struct curl_slist *headers_copy;
>>> +
>>> +  /* Used by pool.c */
>>> +  struct command *cmd;
>>> +};
>>> +
>>> +/* Asynchronous commands that can be sent to the pool thread. */
>>> +enum command_type { EASY_HANDLE, STOP };
>>> +struct command {
>>> +  /* These fields are set by the caller. */
>>> +  enum command_type type;       /* command */
>>> +  struct curl_handle *ch;       /* for EASY_HANDLE, the easy handle */
>>> +
>>> +  /* This field is set to a unique value by send_command_and_wait. */
>>> +  uint64_t id;                  /* serial number */
>>> +
>>> +  /* These fields are used to signal back that the command finished. */
>>> +  pthread_mutex_t mutex;        /* completion mutex */
>>> +  pthread_cond_t cond;          /* completion condition */
>>> +  CURLcode status;              /* status code (CURLE_OK = succeeded) */
>>>  };
>>
>> Makes sense.  The two types are mutually recursive (curl_handle
>> includes a struct command *; command includes a struct curl_handle *);
>> hopefully you have proper locking when altering multiple objects to
>> adjust how they point to one another.
> 
> Actually locking is not needed.  Let me document it through ...
> 
> We create both the curl easy handle and the associated EASY_HANDLE
> command in the nbdkit thread that gets the request, eg. in the curl
> .pread method.  That of course requires no locking.
> 
> There is a single background worker thread.
> 
> A "self pipe" passes pointers to 'struct command *' to this worker
> thread simple by writing the 8 byte pointer onto the pipe (hopefully
> atomic ...)  The nbdkit request thread then blocks on the mutex/cond
> in the command handle.

Right, that's exactly that I got curious about.

In my opinion, writing to the self-pipe is mostly safe. Reading from the
self-pipe could be slightly polished.

Here are the POSIX pages on write() and read():

https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html

The first one (write) says that if you attempt to write at most PIPE_BUF
bytes, then writes will not be interleaved with other concurrent writes,
so in a sense the write will be atomic. (And in this case, O_NONBLOCK
only changes the behavior for the case when the buffer cannot be written
in entirety: with O_NONBLOCK clear, the writer thread will block, with
O_NONBLOCK set, the writer thread will see -1/EAGAIN.)

Now, PIPE_BUF is "variable" in a sense:

https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html

but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our
pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely
satisfied.

... The only case I see possible for a short write is the delivery of a
signal after transfer has started. *If* nbdkit catches some signal such
that the signal handler actually returns, then this could be a
(theoretical) problem, calling for xwrite() or similar.

> 
> The background worker thread picks the 'struct command *' up from the
> pipe in the same event loop that it uses to process ongoing requests
> on the multi handle.  It takes the easy handle and adds it to the
> multi handle.

The spec on read() does not seem to have the same kind of
non-interleaving / "atomicity" language that write() does. The rationale
section says:

"The standard developers considered adding atomicity requirements to a
pipe or FIFO, but recognized that due to the nature of pipes and FIFOs
there could be no guarantee of atomicity of reads of {PIPE_BUF} or any
other size that would be an aid to applications portability."

Now given that the writer side is free of interleaving (because, in the
first place: there is only a single writer!), I think we need not worry
about data corruption. However, it does feel like read() may return
fewer than 8 bytes in one go, "just because" (not only because of a
signal being delivered midway).

And that may be a problem with readiness reporting via
curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command
pointer may not yet be available.

Now I do think a split read is extremely unlikely, maybe even impossible
on Linux. If we choose to be pedantic, then the curl_multi_wait() loop
might want to expect "cmd" to be populated only over multiple iterations
-- like use "cmd_ptr_bytes" or something similar for tracking the size
already available, and only consider "cmd" usable when cmd_ptr_bytes
reaches sizeof cmd.


Yet another topic that comes up is visibility / ordering. Transfering a
pointer via write()/read() between threads does not seem to guarantee
ordering / visibility regarding the *pointed-to* area per spec. Pthread
mutex APIs (and C11 thread and atomics APIs) include the CPU
instructions for ensuring proper memory visibility.

*BUT* I think it would be insane for any POSIX implementation to have a
write()+read() combination that's *weaker* regarding data consistency,
(i.e., that's more racy) than mutexes. Write() and read() are
heavy-weight syscalls, so I can't imagine a publish-subscribe pattern
not working with them (i.e., the pointed-to area not "settling" until
the pointer is consumed).

Laszlo

> 
> When the easy handle work has finished, the worker thread removes it
> from the multi handle and signals the nbdkit request thread to wake up
> (using cmd->mutex + cmd->lock).  At which point possession passes back
> to the request thread which will usually free up both the command and
> easy handle.
> 
>>> +++ b/plugins/curl/config.c
>>
>>> +++ b/plugins/curl/curl.c
>>>  
>>> +/* Get the file size. */
>>> +static int get_content_length_accept_range (struct curl_handle *ch);
>>> +static bool try_fallback_GET_method (struct curl_handle *ch);
>>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void 
>>> *opaque);
>>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void 
>>> *opaque);
>>> +
>>> +static int64_t
>>> +curl_get_size (void *handle)
>>> +{
>>> +  struct curl_handle *ch;
>>> +  CURLcode r;
>>> +  long code;
>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>> +  curl_off_t o;
>>> +#else
>>> +  double d;
>>> +#endif
>>> +  int64_t exportsize;
>>> +
>>> +  /* Get a curl easy handle. */
>>> +  ch = allocate_handle ();
>>> +  if (ch == NULL) goto err;
>>> +
>>> +  /* Prepare to read the headers. */
>>> +  if (get_content_length_accept_range (ch) == -1)
>>> +    goto err;
>>> +
>>> +  /* Send the command to the worker thread and wait. */
>>> +  struct command cmd = {
>>> +    .type = EASY_HANDLE,
>>> +    .ch = ch,
>>> +  };
>>> +
>>> +  r = send_command_and_wait (&cmd);
>>> +  update_times (ch->c);
>>> +  if (r != CURLE_OK) {
>>> +    display_curl_error (ch, r,
>>> +                        "problem doing HEAD request to fetch size of URL 
>>> [%s]",
>>> +                        url);
>>> +
>>> +    /* Get the HTTP status code, if available. */
>>> +    r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
>>> +    if (r == CURLE_OK)
>>> +      nbdkit_debug ("HTTP status code: %ld", code);
>>> +    else
>>> +      code = -1;
>>> +
>>> +    /* See comment on try_fallback_GET_method below. */
>>> +    if (code != 403 || !try_fallback_GET_method (ch))
>>> +      goto err;
>>> +  }
>>> +
>>> +  /* Get the content length.
>>> +   *
>>> +   * Note there is some subtlety here: For web servers using chunked
>>> +   * encoding, either the Content-Length header will not be present,
>>> +   * or if present it should be ignored.  (For such servers the only
>>> +   * way to find out the true length would be to read all of the
>>> +   * content, which we don't want to do).
>>> +   *
>>> +   * Curl itself resolves this for us.  It will ignore the
>>> +   * Content-Length header if chunked encoding is used, returning the
>>> +   * length as -1 which we check below (see also
>>> +   * curl:lib/http.c:Curl_http_size).
>>> +   */
>>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
>>> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
>>> +  if (r != CURLE_OK) {
>>> +    display_curl_error (ch, r,
>>> +                        "could not get length of remote file [%s]", url);
>>> +    goto err;
>>> +  }
>>> +
>>> +  if (o == -1) {
>>> +    nbdkit_error ("could not get length of remote file [%s], "
>>> +                  "is the URL correct?", url);
>>> +    goto err;
>>> +  }
>>> +
>>> +  exportsize = o;
>>> +#else
>>> +  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
>>> +  if (r != CURLE_OK) {
>>> +    display_curl_error (ch, r,
>>> +                        "could not get length of remote file [%s]", url);
>>> +    goto err;
>>> +  }
>>> +
>>> +  if (d == -1) {
>>> +    nbdkit_error ("could not get length of remote file [%s], "
>>> +                  "is the URL correct?", url);
>>> +    goto err;
>>> +  }
>>> +
>>> +  exportsize = d;
>>
>> Does curl guarantee that the double d will contain a value assignable
>> to int64_t without overflow/truncation?  For particularly large sizes,
>> double has insufficient precision for all possible file sizes, but I
>> doubt someone is exposing such large files over HTTP.
> 
> No, I don't believe a 'double' is sufficient.  This is why newer
> versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T.  Note
> this code is just copied from the old curl plugin.
> 
>>> +#endif
>>> +  nbdkit_debug ("content length: %" PRIi64, exportsize);
>>> +
>>> +  /* If this is HTTP, check that byte ranges are supported. */
>>> +  if (ascii_strncasecmp (url, "http://";, strlen ("http://";)) == 0 ||
>>> +      ascii_strncasecmp (url, "https://";, strlen ("https://";)) == 0) {
>>> +    if (!ch->accept_range) {
>>> +      nbdkit_error ("server does not support 'range' (byte range) 
>>> requests");
>>> +      goto err;
>>> +    }
>>> +
>>> +    nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
>>> +  }
>>> +
>>> +  free_handle (ch);
>>> +  return exportsize;
>>> +
>>> + err:
>>> +  if (ch)
>>> +    free_handle (ch);
>>> +  return -1;
>>> +}
>>> +
>>> +/* Get the file size and also whether the remote HTTP server
>>> + * supports byte ranges.
>>> + */
>>> +static int
>>> +get_content_length_accept_range (struct curl_handle *ch)
>>> +{
>>> +  /* We must run the scripts if necessary and set headers in the
>>> +   * handle.
>>> +   */
>>> +  if (do_scripts (ch) == -1)
>>> +    return -1;
>>> +
>>> +  /* Set this flag in the handle to false.  The callback should set it
>>> +   * to true if byte ranges are supported, which we check below.
>>> +   */
>>> +  ch->accept_range = false;
>>> +
>>> +  /* No Body, not nobody!  This forces a HEAD request. */
>>> +  curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
>>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
>>> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
>>> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
>>> +  return 0;
>>> +}
>>> +
>>> +/* S3 servers can return 403 Forbidden for HEAD but still respond
>>> + * to GET, so we give it a second chance in that case.
>>> + * https://github.com/kubevirt/containerized-data-importer/issues/2737
>>> + *
>>> + * This function issues a GET request with a writefunction that always
>>> + * returns an error, thus effectively getting the headers but
>>> + * abandoning the transfer as soon as possible after.
>>> + */
>>> +static bool
>>> +try_fallback_GET_method (struct curl_handle *ch)
>>> +{
>>> +  CURLcode r;
>>> +
>>> +  nbdkit_debug ("attempting to fetch headers using GET method");
>>> +
>>> +  curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
>>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
>>> +  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>> +
>>> +  struct command cmd = {
>>> +    .type = EASY_HANDLE,
>>> +    .ch = ch,
>>> +  };
>>> +
>>> +  r = send_command_and_wait (&cmd);
>>> +  update_times (ch->c);
>>> +
>>> +  /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
>>> +   * (eg if the remote has zero length).  Other errors might happen
>>> +   * but we ignore them since it is a fallback path.
>>> +   */
>>> +  return r == CURLE_OK || r == CURLE_WRITE_ERROR;
>>> +}
>>> +
>>> +static size_t
>>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> +  struct curl_handle *ch = opaque;
>>> +  size_t realsize = size * nmemb;
>>> +  const char *header = ptr;
>>> +  const char *end = header + realsize;
>>> +  const char *accept_ranges = "accept-ranges:";
>>> +  const char *bytes = "bytes";
>>> +
>>> +  if (realsize >= strlen (accept_ranges) &&
>>> +      ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 
>>> 0) {
>>> +    const char *p = strchr (header, ':') + 1;
>>> +
>>> +    /* Skip whitespace between the header name and value. */
>>> +    while (p < end && *p && ascii_isspace (*p))
>>
>> Technically, '*p && ascii_isspace (*p)' can be shortened to
>> 'ascii_isspace (*p)', since the NUL byte is not ascii space.  I don't
>> know if the compiler is smart enough to make that optimization on your
>> behalf.
> 
> Ah indeed.
> 
>>> +      p++;
>>> +
>>> +    if (end - p >= strlen (bytes)
>>> +        && strncmp (p, bytes, strlen (bytes)) == 0) {
>>> +      /* Check that there is nothing but whitespace after the value. */
>>> +      p += strlen (bytes);
>>> +      while (p < end && *p && ascii_isspace (*p))
>>
>> Another spot of the same.
>>
>>> +        p++;
>>> +
>>> +      if (p == end || !*p)
>>> +        ch->accept_range = true;
>>> +    }
>>> +  }
>>> +
>>> +  return realsize;
>>> +}
>>> +
>>> +static size_t
>>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> +#ifdef CURL_WRITEFUNC_ERROR
>>> +  return CURL_WRITEFUNC_ERROR;
>>> +#else
>>> +  return 0; /* in older curl, any size < requested will also be an error */
>>> +#endif
>>> +}
>>> +
>>>  /* Read data from the remote server. */
>>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void 
>>> *opaque);
>>> +
>>>  static int
>>>  curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
>>>  {
>>>    CURLcode r;
>>> +  struct curl_handle *ch;
>>>    char range[128];
>>>  
>>> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>> -  if (ch == NULL)
>>> -    return -1;
>>> +  /* Get a curl easy handle. */
>>> +  ch = allocate_handle ();
>>> +  if (ch == NULL) goto err;
>>>  
>>>    /* Run the scripts if necessary and set headers in the handle. */
>>> -  if (do_scripts (ch) == -1) return -1;
>>> +  if (do_scripts (ch) == -1) goto err;
>>>  
>>>    /* Tell the write_cb where we want the data to be written.  write_cb
>>>     * will update this if the data comes in multiple sections.
>>>     */
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
>>> +  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
>>>    ch->write_buf = buf;
>>>    ch->write_count = count;
>>>  
>>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, 
>>> uint64_t offset)
>>>              offset, offset + count);
>>>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>  
>>> -  /* The assumption here is that curl will look after timeouts. */
>>> -  r = curl_easy_perform (ch->c);
>>> +  /* Send the command to the worker thread and wait. */
>>> +  struct command cmd = {
>>> +    .type = EASY_HANDLE,
>>> +    .ch = ch,
>>> +  };
>>> +
>>> +  r = send_command_and_wait (&cmd);
>>>    if (r != CURLE_OK) {
>>> -    display_curl_error (ch, r, "pread: curl_easy_perform");
>>> -    return -1;
>>> +    display_curl_error (ch, r, "pread");
>>> +    goto err;
>>>    }
>>>    update_times (ch->c);
>>>  
>>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, 
>>> uint64_t offset)
>>>    /* As far as I understand the cURL API, this should never happen. */
>>>    assert (ch->write_count == 0);
>>>  
>>> +  free_handle (ch);
>>>    return 0;
>>> +
>>> + err:
>>> +  if (ch)
>>> +    free_handle (ch);
>>> +  return -1;
>>> +}
>>> +
>>> +/* NB: The terminology used by libcurl is confusing!
>>> + *
>>> + * WRITEFUNCTION / write_cb is used when reading from the remote server
>>> + * READFUNCTION / read_cb is used when writing to the remote server.
>>> + *
>>> + * We use the same terminology as libcurl here.
>>> + */
>>> +static size_t
>>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> +  struct curl_handle *ch = opaque;
>>> +  size_t orig_realsize = size * nmemb;
>>> +  size_t realsize = orig_realsize;
>>
>> Do we have to worry about overflow when compiling on 32-bit machines?
>> Asked differently, should we be using off_t instead of size_t in any
>> of this code?  Thankfully, for now, we know NBD .pread and .pwrite
>> requests are capped at 64M, so I think you're okay (we aren't ever
>> going to ask curl for gigabytes in one request),
> 
> It's a good question ...  I suspect that even if we requested it, web
> servers probably wouldn't want to serve gigabytes of data in a range
> request, but as you point out we shouldn't ever request it right now.
> 
>> but maybe a comment
>> or assert() is worth it?
> 
> I'll add a comment, but could we do this with one of the overflow
> macros?  I'm not sure ...
> 
>>> +
>>> +  assert (ch->write_buf);
>>> +
>>> +  /* Don't read more than the requested amount of data, even if the
>>> +   * server or libcurl sends more.
>>> +   */
>>> +  if (realsize > ch->write_count)
>>> +    realsize = ch->write_count;
>>> +
>>> +  memcpy (ch->write_buf, ptr, realsize);
>>> +
>>> +  ch->write_count -= realsize;
>>> +  ch->write_buf += realsize;
>>> +
>>> +  return orig_realsize;
>>
>> [1]
>>
>>>  }
>>>  
>>>  /* Write data to the remote server. */
>>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
>>> +
>>>  static int
>>>  curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t 
>>> offset)
>>>  {
>>>    CURLcode r;
>>> +  struct curl_handle *ch;
>>>    char range[128];
>>>  
>>> -  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
>>> -  if (ch == NULL)
>>> -    return -1;
>>> +  /* Get a curl easy handle. */
>>> +  ch = allocate_handle ();
>>> +  if (ch == NULL) goto err;
>>>  
>>>    /* Run the scripts if necessary and set headers in the handle. */
>>> -  if (do_scripts (ch) == -1) return -1;
>>> +  if (do_scripts (ch) == -1) goto err;
>>>  
>>>    /* Tell the read_cb where we want the data to be read from.  read_cb
>>>     * will update this if the data comes in multiple sections.
>>>     */
>>> +  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
>>> +  curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
>>>    ch->read_buf = buf;
>>>    ch->read_count = count;
>>>  
>>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t 
>>> count, uint64_t offset)
>>>              offset, offset + count);
>>>    curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
>>>  
>>> -  /* The assumption here is that curl will look after timeouts. */
>>> -  r = curl_easy_perform (ch->c);
>>> +  /* Send the command to the worker thread and wait. */
>>> +  struct command cmd = {
>>> +    .type = EASY_HANDLE,
>>> +    .ch = ch,
>>> +  };
>>> +
>>> +  r = send_command_and_wait (&cmd);
>>>    if (r != CURLE_OK) {
>>> -    display_curl_error (ch, r, "pwrite: curl_easy_perform");
>>> -    return -1;
>>> +    display_curl_error (ch, r, "pwrite");
>>> +    goto err;
>>>    }
>>>    update_times (ch->c);
>>>  
>>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t 
>>> count, uint64_t offset)
>>>    /* As far as I understand the cURL API, this should never happen. */
>>>    assert (ch->read_count == 0);
>>>  
>>> +  free_handle (ch);
>>>    return 0;
>>> +
>>> + err:
>>> +  if (ch)
>>> +    free_handle (ch);
>>> +  return -1;
>>> +}
>>> +
>>> +static size_t
>>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
>>> +{
>>> +  struct curl_handle *ch = opaque;
>>> +  size_t realsize = size * nmemb;
>>> +
>>> +  assert (ch->read_buf);
>>> +  if (realsize > ch->read_count)
>>> +    realsize = ch->read_count;
>>> +
>>> +  memcpy (ptr, ch->read_buf, realsize);
>>> +
>>> +  ch->read_count -= realsize;
>>> +  ch->read_buf += realsize;
>>> +
>>> +  return realsize;
>>
>> Why does write_cb in [1] above return orig_realsize, but read_cb
>> returns the potentially modified realsize?
> 
> It's a good question (this code was copied from the old plugin which
> was working for years).  It definitely works now.  Note that writes in
> this plugin are probably never used.  They require a web server that
> supports "Range PUTs" which is, probably, not any server in existence.
> 
>>>  }
>>>  
>>>  static struct nbdkit_plugin plugin = {
>>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
>>> index eb2d330e1..2974cda3f 100644
>>> --- a/plugins/curl/pool.c
>>> +++ b/plugins/curl/pool.c
>>> @@ -30,11 +30,29 @@
>>>   * SUCH DAMAGE.
>>>   */
>>>  
>>> -/* Curl handle pool.
>>> +/* Worker thread which processes the curl multi interface.
>>>   *
>>> - * To get a libcurl handle, call get_handle().  When you hold the
>>> - * handle, it is yours exclusively to use.  After you have finished
>>> - * with the handle, put it back into the pool by calling put_handle().
>>> + * The main nbdkit threads (see curl.c) create curl easy handles
>>> + * initialized with the work they want to carry out.  Note there is
>>> + * one easy handle per task (eg. per pread/pwrite request).  The easy
>>> + * handles are not reused.
>>> + *
>>> + * The commands + optional easy handle are submitted to the worker
>>> + * thread over a self-pipe (it's easy to use a pipe here because the
>>> + * way curl multi works is it can listen on an extra fd, but not on
>>> + * anything else like a pthread condition).  The curl multi performs
>>> + * the work of the outstanding easy handles.
>>> + *
>>> + * When an easy handle finishes work or errors, we retire the command
>>> + * by signalling back to the waiting nbdkit thread using a pthread
>>> + * condition.
>>> + *
>>> + * In my experiments, we're almost always I/O bound so I haven't seen
>>> + * any strong need to use more than one curl multi / worker thread,
>>> + * although it would be possible to add more in future.
>>> + *
>>> + * See also this extremely useful thread:
>>> + * https://curl.se/mail/lib-2019-03/0100.html
>>
>> Very useful comment (and link).
>>
>>>   */
>>>  
>>>  #include <config.h>
>>> @@ -45,9 +63,19 @@
>>>  #include <stdint.h>
>>>  #include <inttypes.h>
>>>  #include <string.h>
>>> +#include <unistd.h>
>>>  #include <assert.h>
>>>  #include <pthread.h>
>>>  
>>> +#ifdef HAVE_STDATOMIC_H
>>> +#include <stdatomic.h>
>>> +#else
>>> +/* Some old platforms lack atomic types, but 32 bit ints are usually
>>> + * "atomic enough".
>>> + */
>>> +#define _Atomic /**/
>>> +#endif
>>> +
>>>  #include <curl/curl.h>
>>>  
>>>  #include <nbdkit-plugin.h>
>>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
>>>  
>>>  unsigned connections = 4;
>>>  
>>> -/* This lock protects access to the curl_handles vector below. */
>>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>>> +/* Pipe used to notify background thread that a command is pending in
>>> + * the queue.  A pointer to the 'struct command' is sent over the
>>> + * pipe.
>>> + */
>>> +static int self_pipe[2] = { -1, -1 };
>>>  
>>> -/* List of curl handles.  This is allocated dynamically as more
>>> - * handles are requested.  Currently it does not shrink.  It may grow
>>> - * up to 'connections' in length.
>>> +/* The curl multi handle. */
>>> +static CURLM *multi;
>>> +
>>> +/* List of running easy handles.  We only need to maintain this so we
>>> + * can remove them from the multi handle when cleaning up.
>>>   */
>>>  DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
>>>  static curl_handle_list curl_handles = empty_vector;
>>>  
>>> -/* The condition is used when the curl handles vector is full and
>>> - * we're waiting for a thread to put_handle.
>>> - */
>>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>>> -static size_t in_use = 0, waiting = 0;
>>> +static const char *
>>> +command_type_to_string (enum command_type type)
>>> +{
>>> +  switch (type) {
>>> +  case EASY_HANDLE: return "EASY_HANDLE";
>>> +  case STOP:        return "STOP";
>>> +  default:          abort ();
>>> +  }
>>> +}
>>>  
>>>  int
>>>  pool_get_ready (void)
>>>  {
>>> +  multi = curl_multi_init ();
>>> +  if (multi == NULL) {
>>> +    nbdkit_error ("curl_multi_init failed: %m");
>>> +    return -1;
>>> +  }
>>> +
>>>    return 0;
>>>  }
>>>  
>>> +/* Start and stop the background thread. */
>>> +static pthread_t thread;
>>> +static bool thread_running;
>>> +static void *pool_worker (void *);
>>> +
>>>  int
>>>  pool_after_fork (void)
>>>  {
>>> +  int err;
>>> +
>>> +  if (pipe (self_pipe) == -1) {
>>> +    nbdkit_error ("pipe: %m");
>>> +    return -1;
>>> +  }
>>> +
>>> +  /* Start the pool background thread where all the curl work is done. */
>>> +  err = pthread_create (&thread, NULL, pool_worker, NULL);
>>> +  if (err != 0) {
>>> +    errno = err;
>>> +    nbdkit_error ("pthread_create: %m");
>>> +    return -1;
>>> +  }
>>> +  thread_running = true;
>>> +
>>>    return 0;
>>>  }
>>>  
>>> -/* Close and free all handles in the pool. */
>>> +/* Unload the background thread. */
>>>  void
>>>  pool_unload (void)
>>>  {
>>> -  size_t i;
>>> +  if (thread_running) {
>>> +    /* Stop the background thread. */
>>> +    struct command cmd = { .type = STOP };
>>> +    send_command_and_wait (&cmd);
>>> +    pthread_join (thread, NULL);
>>> +    thread_running = false;
>>> +  }
>>>  
>>> -  if (curl_debug_pool)
>>> -    nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
>>> -                  curl_handles.len);
>>> +  if (self_pipe[0] >= 0) {
>>> +    close (self_pipe[0]);
>>> +    self_pipe[0] = -1;
>>> +  }
>>> +  if (self_pipe[1] >= 0) {
>>> +    close (self_pipe[1]);
>>> +    self_pipe[1] = -1;
>>> +  }
>>>  
>>> -  for (i = 0; i < curl_handles.len; ++i)
>>> -    free_handle (curl_handles.ptr[i]);
>>> -  curl_handle_list_reset (&curl_handles);
>>> +  if (multi) {
>>> +    size_t i;
>>> +
>>> +    /* Remove and free any easy handles in the multi. */
>>> +    for (i = 0; i < curl_handles.len; ++i) {
>>> +      curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
>>> +      free_handle (curl_handles.ptr[i]);
>>> +    }
>>> +
>>> +    curl_multi_cleanup (multi);
>>> +    multi = NULL;
>>> +  }
>>>  }
>>>  
>>> -/* Get a handle from the pool.
>>> - *
>>> - * It is owned exclusively by the caller until they call put_handle.
>>> +/* Command queue. */
>>> +static _Atomic uint64_t id;     /* next command ID */
>>> +
>>> +/* Send command to the background thread and wait for completion.
>>> + * This is only called by one of the nbdkit threads.
>>>   */
>>> -struct curl_handle *
>>> -get_handle (void)
>>> +CURLcode
>>> +send_command_and_wait (struct command *cmd)
>>>  {
>>> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>> -  size_t i;
>>> -  struct curl_handle *ch;
>>> -
>>> - again:
>>> -  /* Look for a handle which is not in_use. */
>>> -  for (i = 0; i < curl_handles.len; ++i) {
>>> -    ch = curl_handles.ptr[i];
>>> -    if (!ch->in_use) {
>>> -      ch->in_use = true;
>>> -      in_use++;
>>> +  cmd->id = id++;
>>> +
>>> +  /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
>>> +   * indicate that the command has not yet been completed and status
>>> +   * set.
>>> +   */
>>> +  cmd->status = -1;
>>> +
>>> +  /* This will be used to signal command completion back to us. */
>>> +  pthread_mutex_init (&cmd->mutex, NULL);
>>> +  pthread_cond_init (&cmd->cond, NULL);
>>> +
>>> +  /* Send the command to the background thread. */
>>> +  if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
>>> +    abort ();
>>> +
>>> +  /* Wait for the command to be completed by the background thread. */
>>> +  {
>>> +    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>> +    while (cmd->status == -1) /* for -1, see above */
>>> +      pthread_cond_wait (&cmd->cond, &cmd->mutex);
>>> +  }
>>> +
>>> +  pthread_mutex_destroy (&cmd->mutex);
>>> +  pthread_cond_destroy (&cmd->cond);
>>> +
>>> +  /* Note the main thread must call nbdkit_error on error! */
>>> +  return cmd->status;
>>> +}
>>> +
>>> +/* The background thread. */
>>> +static void check_for_finished_handles (void);
>>> +static void retire_command (struct command *cmd, CURLcode code);
>>> +static void do_easy_handle (struct command *cmd);
>>> +
>>> +static void *
>>> +pool_worker (void *vp)
>>> +{
>>> +  bool stop = false;
>>> +
>>> +  if (curl_debug_pool)
>>> +    nbdkit_debug ("curl: background thread started");
>>> +
>>> +  while (!stop) {
>>> +    struct command *cmd = NULL;
>>> +    struct curl_waitfd extra_fds[1] =
>>> +      { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
>>> +    CURLMcode mc;
>>> +    int numfds, running_handles, repeats = 0;
>>> +
>>> +    do {
>>> +      /* Process the multi handle. */
>>> +      mc = curl_multi_perform (multi, &running_handles);
>>> +      if (mc != CURLM_OK) {
>>> +        nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));
>>
>> Since nbdkit_error() stores its string in thread-local storage, is
>> there anything that ever extracts this error over to the nbdkit thread
>> that issued the original request to the worker thread?...
>>
>>> +        abort (); /* XXX We don't expect this to happen */
>>
>> ...Then again, if we abort, it doesn't matter.
> 
> I was unclear what to do here.  The final code does:
> 
> while (!stop) {
> ...
>     cmd = process_multi_handle ();
>     if (cmd == NULL)
>       continue; /* or die?? */
> 
> with process_multi_handle still calling nbdkit_error.  I felt it might
> be better to keep trying than just abort, as presumably some (most?)
> errors are transient.
> 
> nbdkit_error will ensure the error message is written out.
> 
>>> +      }
>>
>>> +
>>> +      check_for_finished_handles ();
>>> +
>>> +      mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
>>> +      if (mc != CURLM_OK) {
>>> +        nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
>>> +        abort (); /* XXX We don't expect this to happen */
>>> +      }
>>> +
>>>        if (curl_debug_pool)
>>> -        nbdkit_debug ("get_handle: %zu", ch->i);
>>> -      return ch;
>>> -    }
>>> -  }
>>> +        nbdkit_debug ("curl_multi_wait returned: running_handles=%d 
>>> numfds=%d",
>>> +                      running_handles, numfds);
>>> +
>>> +      if (numfds == 0) {
>>> +        repeats++;
>>> +        if (repeats > 1)
>>> +          nbdkit_nanosleep (1, 0);
>>> +      }
>>> +      else {
>>> +        repeats = 0;
>>> +        if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
>>> +          /* There's a command waiting. */
>>> +          if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
>>> +            abort ();
>>> +        }
>>> +      }
>>> +    } while (!cmd);
>>>  
>>> -  /* If more connections are allowed, then allocate a new handle. */
>>> -  if (curl_handles.len < connections) {
>>> -    ch = allocate_handle ();
>>> -    if (ch == NULL)
>>> -      return NULL;
>>> -    if (curl_handle_list_append (&curl_handles, ch) == -1) {
>>> -      free_handle (ch);
>>> -      return NULL;
>>> -    }
>>> -    ch->i = curl_handles.len - 1;
>>> -    ch->in_use = true;
>>> -    in_use++;
>>>      if (curl_debug_pool)
>>> -      nbdkit_debug ("get_handle: %zu", ch->i);
>>> -    return ch;
>>> -  }
>>> +      nbdkit_debug ("curl: dispatching %s command %" PRIu64,
>>> +                    command_type_to_string (cmd->type), cmd->id);
>>> +
>>> +    switch (cmd->type) {
>>> +    case STOP:
>>> +      stop = true;
>>> +      retire_command (cmd, CURLE_OK);
>>> +      break;
>>>  
>>> -  /* Otherwise we have run out of connections so we must wait until
>>> -   * another thread calls put_handle.
>>> -   */
>>> -  assert (in_use == connections);
>>> -  waiting++;
>>> -  while (in_use == connections)
>>> -    pthread_cond_wait (&cond, &lock);
>>> -  waiting--;
>>> +    case EASY_HANDLE:
>>> +      do_easy_handle (cmd);
>>> +      break;
>>> +    }
>>> +  } /* while (!stop) */
>>>  
>>> -  goto again;
>>> +  if (curl_debug_pool)
>>> +    nbdkit_debug ("curl: background thread stopped");
>>> +
>>> +  return NULL;
>>>  }
>>>  
>>> -/* Return the handle to the pool. */
>>> -void
>>> -put_handle (struct curl_handle *ch)
>>> +/* This checks if any easy handles in the multi have
>>> + * finished and retires the associated commands.
>>> + */
>>> +static void
>>> +check_for_finished_handles (void)
>>>  {
>>> -  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
>>> +  CURLMsg *msg;
>>> +  int msgs_in_queue;
>>> +
>>> +  while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
>>> +    size_t i;
>>> +    struct curl_handle *ch = NULL;
>>> +
>>> +    if (msg->msg == CURLMSG_DONE) {
>>> +      /* Find this curl_handle. */
>>> +      for (i = 0; i < curl_handles.len; ++i) {
>>> +        if (curl_handles.ptr[i]->c == msg->easy_handle) {
>>> +          ch = curl_handles.ptr[i];
>>> +          curl_handle_list_remove (&curl_handles, i);
>>> +        }
>>> +      }
>>> +      if (ch == NULL) abort ();
>>> +      curl_multi_remove_handle (multi, ch->c);
>>> +
>>> +      retire_command (ch->cmd, msg->data.result);
>>> +    }
>>> +  }
>>> +}
>>>  
>>> +/* Retire a command.  status is a CURLcode. */
>>> +static void
>>> +retire_command (struct command *cmd, CURLcode status)
>>> +{
>>>    if (curl_debug_pool)
>>> -    nbdkit_debug ("put_handle: %zu", ch->i);
>>> +    nbdkit_debug ("curl: retiring %s command %" PRIu64,
>>> +                  command_type_to_string (cmd->type), cmd->id);
>>> +
>>> +  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
>>> +  cmd->status = status;
>>> +  pthread_cond_signal (&cmd->cond);
>>> +}
>>> +
>>> +static void
>>> +do_easy_handle (struct command *cmd)
>>> +{
>>> +  CURLMcode mc;
>>> +
>>> +  cmd->ch->cmd = cmd;
>>> +
>>> +  /* Add the handle to the multi. */
>>> +  mc = curl_multi_add_handle (multi, cmd->ch->c);
>>> +  if (mc != CURLM_OK) {
>>> +    nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
>>> +    goto err;
>>> +  }
>>>  
>>> -  ch->in_use = false;
>>> -  in_use--;
>>> +  if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
>>> +    goto err;
>>> +  return;
>>>  
>>> -  /* Signal the next thread which is waiting. */
>>> -  if (waiting > 0)
>>> -    pthread_cond_signal (&cond);
>>> + err:
>>> +  retire_command (cmd, CURLE_OUT_OF_MEMORY);
>>>  }
>>> -- 
>>> 2.41.0
>>
>> Overall looks nice, and I learned more about curl in the process.
> 
> For me, too much :-(
> 
> Thanks,
> 
> Rich.
> 

_______________________________________________
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs


Reply via email to