On Fri, Dec 04, 2020 at 12:39:07PM -0600, Eric Blake wrote: > On 12/4/20 10:53 AM, Sergio Lopez wrote: > > When switching between AIO contexts we need to me make sure that both > > recv_coroutine and send_coroutine are not scheduled to run. Otherwise, > > QEMU may crash while attaching the new context with an error like > > this one: > > > > aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule' > > > > To achieve this we need a local implementation of > > 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done > > by 'nbd/client.c') that allows us to interrupt the operation and to > > know when recv_coroutine is yielding. > > > > With this in place, we delegate detaching the AIO context to the > > owning context with a BH ('nbd_aio_detach_bh') scheduled using > > 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the > > channel by setting 'client->quiescing' to 'true', and either waits for > > the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in > > 'nbd_read_eof', actively enters the coroutine to interrupt it. > > > > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326 > > Signed-off-by: Sergio Lopez <s...@redhat.com> > > --- > > nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------ > > 1 file changed, 106 insertions(+), 14 deletions(-) > > A complex patch, so I'd appreciate a second set of eyes. > > > > > diff --git a/nbd/server.c b/nbd/server.c > > index 613ed2634a..7229f487d2 100644 > > --- a/nbd/server.c > > +++ b/nbd/server.c > > @@ -132,6 +132,9 @@ struct NBDClient { > > CoMutex send_lock; > > Coroutine *send_coroutine; > > > > + bool read_yielding; > > + bool quiescing; > > Will either of these fields need to be accessed atomically once the > 'yank' code is added, or are we still safe with direct access because > coroutines are not multithreaded?
Yes, those are only accessed from coroutines, which will be scheduled on the same thread. > > + > > QTAILQ_ENTRY(NBDClient) next; > > int nb_requests; > > bool closing; > > @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient > > *client, Error **errp) > > return 0; > > } > > > > -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, > > +/* nbd_read_eof > > + * Tries to read @size bytes from @ioc. This is a local implementation of > > + * qio_channel_readv_all_eof. We have it here because we need it to be > > + * interruptible and to know when the coroutine is yielding. > > + * Returns 1 on success > > + * 0 on eof, when no data was read (errp is not set) > > + * negative errno on failure (errp is set) > > + */ > > +static inline int coroutine_fn > > +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) > > +{ > > + bool partial = false; > > + > > + assert(size); > > + while (size > 0) { > > + struct iovec iov = { .iov_base = buffer, .iov_len = size }; > > + ssize_t len; > > + > > + len = qio_channel_readv(client->ioc, &iov, 1, errp); > > + if (len == QIO_CHANNEL_ERR_BLOCK) { > > + client->read_yielding = true; > > + qio_channel_yield(client->ioc, G_IO_IN); > > + client->read_yielding = false; > > nbd/client.c:nbd_read_eof() uses bdrv_dec/inc_in_flight instead of > read_yielding... > > > + if (client->quiescing) { > > + return -EAGAIN; > > + } > > and the quiescing check is new; otherwise, these two functions look > identical. Having two static functions with the same name makes gdb a > bit more annoying (which one of the two did you want your breakpoint > on?). Is there any way we could write this code only once in > nbd/common.c for reuse by both client and server? But I can live with > it as written. I'm not happy with this either, but on the first implementation I've tried to come up with a unique function for both use cases, and it looked terrible. We can easily use a different name, though. > > @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, > > uint64_t handle, > > > > /* nbd_co_receive_request > > * Collect a client request. Return 0 if request looks valid, -EIO to drop > > - * connection right away, and any other negative value to report an error > > to > > - * the client (although the caller may still need to disconnect after > > reporting > > - * the error). > > + * connection right away, -EAGAIN to indicate we were interrupted and the > > + * channel should be quiesced, and any other negative value to report an > > error > > + * to the client (although the caller may still need to disconnect after > > + * reporting the error). > > */ > > static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, > > Error **errp) > > { > > NBDClient *client = req->client; > > int valid_flags; > > + int ret; > > > > g_assert(qemu_in_coroutine()); > > assert(client->recv_coroutine == qemu_coroutine_self()); > > - if (nbd_receive_request(client->ioc, request, errp) < 0) { > > - return -EIO; > > + ret = nbd_receive_request(client, request, errp); > > + if (ret < 0) { > > + return ret; > > Why the double space? Ouch, copy/paste mistake. > The old code slams to EIO, you preserve errors. Is that going to bite > us by causing us to see a different errno leaked through? When reading from the channel, nbd_read_eof hides the actual errno behind EIO, so the only actual difference is that, with this change, nbd_receive_request may send this error... if (magic != NBD_REQUEST_MAGIC) { error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic); return -EINVAL; } ... to the client (via server.c:2624), which I think is the right thing to do. > > } > > > > trace_nbd_co_receive_request_decode_type(request->handle, > > request->type, > > @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque) > > return; > > } > > > > + if (client->quiescing) { > > + /* > > + * We're switching between AIO contexts. Don't attempt to receive > > a new > > + * request and kick the main context which may be waiting for us. > > s/request/request,/ Thanks, will fix this comment. > > + */ > > + nbd_client_put(client); > > + client->recv_coroutine = NULL; > > + aio_wait_kick(); > > + return; > > + } > > + > > req = nbd_request_get(client); > > ret = nbd_co_receive_request(req, &request, &local_err); > > client->recv_coroutine = NULL; > > @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque) > > goto done; > > } > > > > + if (ret == -EAGAIN) { > > + assert(client->quiescing); > > + goto done; > > + } > > + > > nbd_client_receive_next_request(client); > > if (ret == -EIO) { > > goto disconnect; > > @@ -2565,7 +2656,8 @@ disconnect: > > > > static void nbd_client_receive_next_request(NBDClient *client) > > { > > - if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) > > { > > + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS > > && > > + !client->quiescing) { > > nbd_client_get(client); > > client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); > > aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); > > > > Overall looks okay to me, > Reviewed-by: Eric Blake <ebl...@redhat.com> > > -- > Eric Blake, Principal Software Engineer > Red Hat, Inc. +1-919-301-3226 > Virtualization: qemu.org | libvirt.org >
signature.asc
Description: PGP signature