Protect quiesce_counter and queued_requests with a QemuMutex, so that they are protected from concurrent access in the main thread (for example blk_root_drained_end() reached from bdrv_drain_all()) and in the iothread (where any I/O operation will call blk_inc_in_flight()).
Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- block/block-backend.c | 44 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/block/block-backend.c b/block/block-backend.c index 627d491d4155..fdf82f1f1599 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -23,6 +23,7 @@ #include "qapi/error.h" #include "qapi/qapi-events-block.h" #include "qemu/id.h" +#include "qemu/thread.h" #include "qemu/main-loop.h" #include "qemu/option.h" #include "trace.h" @@ -85,6 +86,8 @@ struct BlockBackend { NotifierList remove_bs_notifiers, insert_bs_notifiers; QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers; + /* Protected by quiesce_lock. */ + QemuMutex quiesce_lock; int quiesce_counter; CoQueue queued_requests; @@ -372,6 +375,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm) block_acct_init(&blk->stats); + qemu_mutex_init(&blk->quiesce_lock); qemu_co_queue_init(&blk->queued_requests); notifier_list_init(&blk->remove_bs_notifiers); notifier_list_init(&blk->insert_bs_notifiers); @@ -490,6 +494,7 @@ static void blk_delete(BlockBackend *blk) assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers)); assert(QLIST_EMPTY(&blk->aio_notifiers)); QTAILQ_REMOVE(&block_backends, blk, link); + qemu_mutex_destroy(&blk->quiesce_lock); drive_info_del(blk->legacy_dinfo); block_acct_cleanup(&blk->stats); g_free(blk); @@ -1451,11 +1456,25 @@ void blk_inc_in_flight(BlockBackend *blk) { IO_CODE(); qatomic_inc(&blk->in_flight); - if (!blk->disable_request_queuing) { - /* TODO: this is not thread-safe! */ + + /* + * Avoid a continuous stream of requests from AIO callbacks, which + * call a user-provided callback while blk->in_flight is elevated, + * if the BlockBackend is being quiesced. + * + * This initial test does not have to be perfect: a race might + * cause an extra I/O to be queued, but sooner or later a nonzero + * quiesce_counter will be observed. + */ + if (!blk->disable_request_queuing && qatomic_read(&blk->quiesce_counter)) { + /* + * ... on the other hand, it is important that the final check and + * the wait are done under the lock, so that no wakeups are missed. + */ + QEMU_LOCK_GUARD(&blk->quiesce_lock); while (blk->quiesce_counter) { qatomic_dec(&blk->in_flight); - qemu_co_queue_wait(&blk->queued_requests, NULL); + qemu_co_queue_wait(&blk->queued_requests, &blk->quiesce_lock); qatomic_inc(&blk->in_flight); } } @@ -2542,7 +2561,8 @@ static void blk_root_drained_begin(BdrvChild *child) BlockBackend *blk = child->opaque; ThrottleGroupMember *tgm = &blk->public.throttle_group_member; - if (++blk->quiesce_counter == 1) { + qatomic_set(&blk->quiesce_counter, blk->quiesce_counter + 1); + if (blk->quiesce_counter == 1) { if (blk->dev_ops && blk->dev_ops->drained_begin) { blk->dev_ops->drained_begin(blk->dev_opaque); } @@ -2560,6 +2580,7 @@ static bool blk_root_drained_poll(BdrvChild *child) { BlockBackend *blk = child->opaque; bool busy = false; + assert(blk->quiesce_counter); if (blk->dev_ops && blk->dev_ops->drained_poll) { @@ -2576,14 +2597,21 @@ static void blk_root_drained_end(BdrvChild *child) assert(blk->public.throttle_group_member.io_limits_disabled); qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled); - if (--blk->quiesce_counter == 0) { + qemu_mutex_lock(&blk->quiesce_lock); + qatomic_set(&blk->quiesce_counter, blk->quiesce_counter - 1); + if (blk->quiesce_counter == 0) { + /* After this point, new I/O will not sleep on queued_requests. */ + qemu_mutex_unlock(&blk->quiesce_lock); + if (blk->dev_ops && blk->dev_ops->drained_end) { blk->dev_ops->drained_end(blk->dev_opaque); } - while (qemu_co_enter_next(&blk->queued_requests, NULL)) { - /* Resume all queued requests */ - } + + /* Now resume all queued requests */ + qemu_mutex_lock(&blk->quiesce_lock); + qemu_co_enter_all(&blk->queued_requests, &blk->quiesce_lock); } + qemu_mutex_unlock(&blk->quiesce_lock); } bool blk_register_buf(BlockBackend *blk, void *host, size_t size, Error **errp) -- 2.38.1