Am 01.06.2021 um 07:57 hat Sergio Lopez geschrieben: > Before switching between AioContexts we need to make sure that we're > fully quiesced ("nb_requests == 0" for every client) when entering the > drained section. > > To do this, we set "quiescing = true" for every client on > ".drained_begin" to prevent new coroutines to be created, and check if > "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the > drained section, on ".drained_end" we set "quiescing = false" and > call "nbd_client_receive_next_request()" to resume the processing of > new requests. > > With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be > reverted to be as simple as they were before f148ae7d36. > > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137 > Suggested-by: Kevin Wolf <kw...@redhat.com> > Signed-off-by: Sergio Lopez <s...@redhat.com> > --- > nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++------------- > 1 file changed, 75 insertions(+), 24 deletions(-) > > diff --git a/nbd/server.c b/nbd/server.c > index 86a44a9b41..33e55479d7 100644 > --- a/nbd/server.c > +++ b/nbd/server.c > @@ -132,7 +132,7 @@ struct NBDClient { > CoMutex send_lock; > Coroutine *send_coroutine; > > - bool read_yielding; > + GSList *yield_co_list; /* List of coroutines yielding on nbd_read_eof */ > bool quiescing;
Hm, how do you get more than one coroutine per client yielding in nbd_read_eof() at the same time? I thought the model is that you always have one coroutine reading the next request (which is client->recv_coroutine) and all the others are just processing the request they had read earlier. Multiple coroutines reading from the same socket would sound like a bad idea. > QTAILQ_ENTRY(NBDClient) next; > @@ -1367,6 +1367,7 @@ static inline int coroutine_fn > nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) > { > bool partial = false; > + Coroutine *co; > > assert(size); > while (size > 0) { > @@ -1375,9 +1376,12 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t > size, Error **errp) > > len = qio_channel_readv(client->ioc, &iov, 1, errp); > if (len == QIO_CHANNEL_ERR_BLOCK) { > - client->read_yielding = true; > + co = qemu_coroutine_self(); > + > + client->yield_co_list = g_slist_prepend(client->yield_co_list, > co); > qio_channel_yield(client->ioc, G_IO_IN); > - client->read_yielding = false; > + client->yield_co_list = g_slist_remove(client->yield_co_list, > co); > + > if (client->quiescing) { > return -EAGAIN; > } > @@ -1513,6 +1517,11 @@ static void nbd_request_put(NBDRequestData *req) > g_free(req); > > client->nb_requests--; > + > + if (client->quiescing && client->nb_requests == 0) { > + aio_wait_kick(); > + } > + > nbd_client_receive_next_request(client); > > nbd_client_put(client); > @@ -1530,49 +1539,75 @@ static void blk_aio_attached(AioContext *ctx, void > *opaque) > QTAILQ_FOREACH(client, &exp->clients, next) { > qio_channel_attach_aio_context(client->ioc, ctx); > > + assert(client->nb_requests == 0); > assert(client->recv_coroutine == NULL); > assert(client->send_coroutine == NULL); > - > - if (client->quiescing) { > - client->quiescing = false; > - nbd_client_receive_next_request(client); > - } > } > } > > -static void nbd_aio_detach_bh(void *opaque) > +static void blk_aio_detach(void *opaque) > { > NBDExport *exp = opaque; > NBDClient *client; > > + trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); > + > QTAILQ_FOREACH(client, &exp->clients, next) { > qio_channel_detach_aio_context(client->ioc); > + } > + > + exp->common.ctx = NULL; > +} > + > +static void nbd_drained_begin(void *opaque) > +{ > + NBDExport *exp = opaque; > + NBDClient *client; > + > + QTAILQ_FOREACH(client, &exp->clients, next) { > client->quiescing = true; > + } > +} > > - if (client->recv_coroutine) { > - if (client->read_yielding) { > - qemu_aio_coroutine_enter(exp->common.ctx, > - client->recv_coroutine); > - } else { > - AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != > NULL); > - } > - } > +static void nbd_drained_end(void *opaque) > +{ > + NBDExport *exp = opaque; > + NBDClient *client; > > - if (client->send_coroutine) { > - AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL); > - } > + QTAILQ_FOREACH(client, &exp->clients, next) { > + client->quiescing = false; > + nbd_client_receive_next_request(client); > } > } > > -static void blk_aio_detach(void *opaque) > +static bool nbd_drained_poll(void *opaque) > { > NBDExport *exp = opaque; > + NBDClient *client; > + Coroutine *co; > + GSList *entry; > + GSList *coroutine_list; > > - trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); > + QTAILQ_FOREACH(client, &exp->clients, next) { > + if (client->nb_requests != 0) { > + /* > + * Enter coroutines waiting for new requests on nbd_read_eof(), > so > + * we don't depend on the client to wake us up. > + */ > + coroutine_list = g_slist_copy(client->yield_co_list); > + for (entry = coroutine_list; > + entry != NULL; > + entry = g_slist_next(entry)) { > + co = entry->data; > + qemu_aio_coroutine_enter(exp->common.ctx, co); > + } > + g_slist_free(coroutine_list); > > - aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp); > + return 1; This would be more accurately spelt true... > + } > + } > > - exp->common.ctx = NULL; > + return 0; ...and this false. > } > > static void nbd_eject_notifier(Notifier *n, void *data) The patch looks correct to me, though I'm not sure if yield_co_list is an unnecessary complication (and if it isn't, whether that's safe). I would be happy enough to apply it anyway if you can explain the yield_co_list thing, but I'll give Eric some time to have a look, too. Kevin