From: Zhu Yangyang <zhuyangyan...@huawei.com> The bdrv_drained_begin() function is a blocking function. In scenarios where network storage is used and network links fail, it may block for a long time. Therefore, we add a timeout parameter to control the duration of the block.
Since bdrv_drained_begin() has been widely adopted, both bdrv_drained_begin() and bdrv_drained_begin_timeout() will be retained. Signed-off-by: Zhu Yangyang <zhuyangyan...@huawei.com> --- block/io.c | 58 +++++++++++++++++++++++++++++++++------- include/block/aio-wait.h | 47 +++++++++++++++++++++++--------- include/block/block-io.h | 22 ++++++++++++++- util/aio-wait.c | 5 ++++ 4 files changed, 108 insertions(+), 24 deletions(-) diff --git a/block/io.c b/block/io.c index 1ba8d1aeea..912b76c4a4 100644 --- a/block/io.c +++ b/block/io.c @@ -255,6 +255,8 @@ typedef struct { bool begin; bool poll; BdrvChild *parent; + uint64_t timeout_ns; + int ret; } BdrvCoDrainData; /* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */ @@ -283,6 +285,10 @@ static bool bdrv_drain_poll_top_level(BlockDriverState *bs, return bdrv_drain_poll(bs, ignore_parent, false); } +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs, + BdrvChild *parent, + bool poll, + uint64_t timeout_ns); static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, bool poll); static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent); @@ -296,7 +302,9 @@ static void bdrv_co_drain_bh_cb(void *opaque) if (bs) { bdrv_dec_in_flight(bs); if (data->begin) { - bdrv_do_drained_begin(bs, data->parent, data->poll); + data->ret = bdrv_do_drained_begin_timeout(bs, data->parent, + data->poll, + data->timeout_ns); } else { assert(!data->poll); bdrv_do_drained_end(bs, data->parent); @@ -310,10 +318,11 @@ static void bdrv_co_drain_bh_cb(void *opaque) aio_co_wake(co); } -static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, - bool begin, - BdrvChild *parent, - bool poll) +static int coroutine_fn bdrv_co_yield_to_drain_timeout(BlockDriverState *bs, + bool begin, + BdrvChild *parent, + bool poll, + uint64_t timeout_ns) { BdrvCoDrainData data; Coroutine *self = qemu_coroutine_self(); @@ -329,6 +338,8 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, .begin = begin, .parent = parent, .poll = poll, + .timeout_ns = timeout_ns, + .ret = 0 }; if (bs) { @@ -342,16 +353,27 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, /* If we are resumed from some other event (such as an aio completion or a * timer callback), it is a bug in the caller that should be fixed. */ assert(data.done); + return data.ret; } -static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, - bool poll) +static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, + bool begin, + BdrvChild *parent, + bool poll) +{ + bdrv_co_yield_to_drain_timeout(bs, begin, parent, poll, 0); +} + +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs, + BdrvChild *parent, + bool poll, + uint64_t timeout_ns) { IO_OR_GS_CODE(); if (qemu_in_coroutine()) { - bdrv_co_yield_to_drain(bs, true, parent, poll); - return; + return bdrv_co_yield_to_drain_timeout(bs, true, parent, poll, + timeout_ns); } GLOBAL_STATE_CODE(); @@ -375,8 +397,17 @@ static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, * nodes. */ if (poll) { - BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent)); + return BDRV_POLL_WHILE_TIMEOUT(bs, + bdrv_drain_poll_top_level(bs, parent), + timeout_ns); } + return 0; +} + +static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, + bool poll) +{ + bdrv_do_drained_begin_timeout(bs, parent, poll, 0); } void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, BdrvChild *parent) @@ -391,6 +422,13 @@ bdrv_drained_begin(BlockDriverState *bs) bdrv_do_drained_begin(bs, NULL, true); } +int coroutine_mixed_fn +bdrv_drained_begin_timeout(BlockDriverState *bs, uint64_t timeout_ns) +{ + IO_OR_GS_CODE(); + return bdrv_do_drained_begin_timeout(bs, NULL, true, timeout_ns); +} + /** * This function does not poll, nor must any of its recursively called * functions. diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h index cf5e8bde1c..7610880b61 100644 --- a/include/block/aio-wait.h +++ b/include/block/aio-wait.h @@ -59,10 +59,12 @@ typedef struct { extern AioWait global_aio_wait; /** - * AIO_WAIT_WHILE_INTERNAL: + * AIO_WAIT_WHILE_TIMEOUT: * @ctx: the aio context, or NULL if multiple aio contexts (for which the * caller does not hold a lock) are involved in the polling condition. * @cond: wait while this conditional expression is true + * @timeout_ns: maximum duration to wait, in nanoseconds, except the value + * is unsigned, 0 means infinite. * * Wait while a condition is true. Use this to implement synchronous * operations that require event loop activity. @@ -74,37 +76,54 @@ extern AioWait global_aio_wait; * thread (with @ctx acquired exactly once). This function cannot be used to * wait on conditions between two IOThreads since that could lead to deadlock, * go via the main loop instead. + * + * Returns: 0 if succeeded; -ETIMEDOUT when a timeout occurs. */ -#define AIO_WAIT_WHILE_INTERNAL(ctx, cond) ({ \ - bool waited_ = false; \ +#define AIO_WAIT_WHILE_TIMEOUT(ctx, cond, timeout_ns) ({ \ + int ret_ = 0; \ + uint64_t timeout_ = (timeout_ns); \ AioWait *wait_ = &global_aio_wait; \ AioContext *ctx_ = (ctx); \ + AioContext *current_ctx_ = NULL; \ + QEMUTimer timer_; \ /* Increment wait_->num_waiters before evaluating cond. */ \ qatomic_inc(&wait_->num_waiters); \ /* Paired with smp_mb in aio_wait_kick(). */ \ smp_mb__after_rmw(); \ if (ctx_ && in_aio_context_home_thread(ctx_)) { \ - while ((cond)) { \ - aio_poll(ctx_, true); \ - waited_ = true; \ - } \ + current_ctx_ = ctx_; \ } else { \ assert(qemu_get_current_aio_context() == \ qemu_get_aio_context()); \ - while ((cond)) { \ - aio_poll(qemu_get_aio_context(), true); \ - waited_ = true; \ + current_ctx_ = qemu_get_aio_context(); \ + } \ + if (timeout_ > 0) { \ + timer_init_full(&timer_, ¤t_ctx_->tlg, \ + QEMU_CLOCK_REALTIME, \ + SCALE_NS, 0, aio_wait_timer_cb, NULL); \ + timer_mod_ns(&timer_, \ + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + \ + timeout_); \ + } \ + while ((cond)) { \ + aio_poll(current_ctx_, true); \ + if (timeout_ > 0 && !timer_pending(&timer_)) { \ + ret_ = -ETIMEDOUT; \ + break; \ } \ } \ + if (timeout_ > 0) { \ + timer_del(&timer_); \ + } \ qatomic_dec(&wait_->num_waiters); \ - waited_; }) + ret_; }) #define AIO_WAIT_WHILE(ctx, cond) \ - AIO_WAIT_WHILE_INTERNAL(ctx, cond) + AIO_WAIT_WHILE_TIMEOUT(ctx, cond, 0) /* TODO replace this with AIO_WAIT_WHILE() in a future patch */ #define AIO_WAIT_WHILE_UNLOCKED(ctx, cond) \ - AIO_WAIT_WHILE_INTERNAL(ctx, cond) + AIO_WAIT_WHILE_TIMEOUT(ctx, cond, 0) /** * aio_wait_kick: @@ -149,4 +168,6 @@ static inline bool in_aio_context_home_thread(AioContext *ctx) } } +void aio_wait_timer_cb(void *opaque); + #endif /* QEMU_AIO_WAIT_H */ diff --git a/include/block/block-io.h b/include/block/block-io.h index b49e0537dd..844e9cf350 100644 --- a/include/block/block-io.h +++ b/include/block/block-io.h @@ -354,6 +354,11 @@ bdrv_co_copy_range(BdrvChild *src, int64_t src_offset, AIO_WAIT_WHILE(bdrv_get_aio_context(bs_), \ cond); }) +#define BDRV_POLL_WHILE_TIMEOUT(bs, cond, timeout_ns) ({ \ + BlockDriverState *bs_ = (bs); \ + AIO_WAIT_WHILE_TIMEOUT(bdrv_get_aio_context(bs_), \ + cond, timeout_ns); }) + void bdrv_drain(BlockDriverState *bs); int co_wrapper_mixed_bdrv_rdlock @@ -432,7 +437,22 @@ bdrv_drain_poll(BlockDriverState *bs, BdrvChild *ignore_parent, void bdrv_drained_begin(BlockDriverState *bs); /** - * bdrv_do_drained_begin_quiesce: + * bdrv_drained_begin_timeout: + * + * Added timeout parameter for bdrv_drained_begin() to make a time limited. + * + * @timeout_ns: maximum duration to wait; 0 means infinite, equal to call + * bdrv_drained_begin(). + * + * Returns: 0 if succeeded; -ETIMEDOUT when a timeout occurs. + * + * Note: when the timeout fails, we've already begin aquiesced section, so we + * still need to call bdrv_drained_end() to end the quiescent section. + */ +int bdrv_drained_begin_timeout(BlockDriverState *bs, uint64_t timeout_ns); + +/** + * bdrv_do_drained_badegin_quiesce: * * Quiesces a BDS like bdrv_drained_begin(), but does not wait for already * running requests to complete. diff --git a/util/aio-wait.c b/util/aio-wait.c index b5336cf5fd..64c3714fb8 100644 --- a/util/aio-wait.c +++ b/util/aio-wait.c @@ -84,3 +84,8 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data); AIO_WAIT_WHILE_UNLOCKED(NULL, !data.done); } + +void aio_wait_timer_cb(void *opaque) +{ + /* The point is to make AIO_WAIT_WHILE_TIMEOUT()'s aio_poll() return */ +} -- 2.33.0