On Thu, 13 Mar, 2025 at 12:09:45 +0800, Stefan Hajnoczi wrote: > On Sat, Mar 08, 2025 at 06:16:17PM +0800, zoudongjie wrote: > > From: Zhu Yangyang <zhuyangyan...@huawei.com> > > > > The bdrv_drained_begin() function is a blocking function. In scenarios > > where network storage > > is used and network links fail, it may block for a long time. > > Therefore, we add a timeout parameter to control the duration of the block. > > > > Since bdrv_drained_begin() has been widely adopted, both > > bdrv_drained_begin() > > and bdrv_drained_begin_timeout() will be retained. > > > > Signed-off-by: Zhu Yangyang <zhuyangyan...@huawei.com> > > --- > > block/io.c | 55 ++++++++++++++++++++++++++++++------- > > include/block/aio-wait.h | 58 ++++++++++++++++++++++++++++++++++++++++ > > include/block/block-io.h | 7 +++++ > > util/aio-wait.c | 7 +++++ > > 4 files changed, 117 insertions(+), 10 deletions(-) > > > > diff --git a/block/io.c b/block/io.c > > index d369b994df..03b8b2dca7 100644 > > --- a/block/io.c > > +++ b/block/io.c > > @@ -255,6 +255,8 @@ typedef struct { > > bool begin; > > bool poll; > > BdrvChild *parent; > > + int ret; > > + int64_t timeout; > > Please put the units (milliseconds) into the variable name here and > everywhere else in the patch to avoid confusion about units: > > int64_t timeout_ms;
Ok, I'm going to modify it in patch V2. > > > } BdrvCoDrainData; > > > > /* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() > > */ > > @@ -283,6 +285,8 @@ static bool bdrv_drain_poll_top_level(BlockDriverState > > *bs, > > return bdrv_drain_poll(bs, ignore_parent, false); > > } > > > > +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs, > > + BdrvChild *parent, bool poll, int64_t timeout); > > static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, > > bool poll); > > static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent); > > @@ -296,7 +300,8 @@ static void bdrv_co_drain_bh_cb(void *opaque) > > if (bs) { > > bdrv_dec_in_flight(bs); > > if (data->begin) { > > - bdrv_do_drained_begin(bs, data->parent, data->poll); > > + data->ret = bdrv_do_drained_begin_timeout( > > + bs, data->parent, data->poll, data->timeout); > > } else { > > assert(!data->poll); > > bdrv_do_drained_end(bs, data->parent); > > @@ -310,10 +315,11 @@ static void bdrv_co_drain_bh_cb(void *opaque) > > aio_co_wake(co); > > } > > > > -static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, > > - bool begin, > > - BdrvChild *parent, > > - bool poll) > > +static int coroutine_fn bdrv_co_yield_to_drain_timeout(BlockDriverState > > *bs, > > + bool begin, > > + BdrvChild *parent, > > + bool poll, > > + int64_t timeout) > > { > > BdrvCoDrainData data; > > Coroutine *self = qemu_coroutine_self(); > > @@ -329,6 +335,8 @@ static void coroutine_fn > > bdrv_co_yield_to_drain(BlockDriverState *bs, > > .begin = begin, > > .parent = parent, > > .poll = poll, > > + .timeout = timeout, > > + .ret = 0 > > }; > > > > if (bs) { > > @@ -342,16 +350,25 @@ static void coroutine_fn > > bdrv_co_yield_to_drain(BlockDriverState *bs, > > /* If we are resumed from some other event (such as an aio completion > > or a > > * timer callback), it is a bug in the caller that should be fixed. */ > > assert(data.done); > > + return data.ret; > > } > > > > -static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, > > - bool poll) > > +static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, > > + bool begin, > > + BdrvChild *parent, > > + bool poll) > > +{ > > + bdrv_co_yield_to_drain_timeout(bs, begin, parent, poll, -1); > > Is this safe on 32-bit platforms? I'm sorry, can it be more specific here, I didn't get it. > > > +} > > + > > +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs, > > + BdrvChild *parent, bool poll, int64_t timeout_ms) > > { > > IO_OR_GS_CODE(); > > > > if (qemu_in_coroutine()) { > > - bdrv_co_yield_to_drain(bs, true, parent, poll); > > - return; > > + return bdrv_co_yield_to_drain_timeout(bs, true, parent, poll, > > + timeout_ms); > > } > > > > GLOBAL_STATE_CODE(); > > @@ -375,8 +392,20 @@ static void bdrv_do_drained_begin(BlockDriverState > > *bs, BdrvChild *parent, > > * nodes. > > */ > > if (poll) { > > - BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent)); > > + if (timeout_ms < 0) { > > + BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent)); > > + } else { > > + return BDRV_POLL_WHILE_TIMEOUT( > > + bs, bdrv_drain_poll_top_level(bs, parent), timeout_ms); > > + } > > Any reason to handle timeout_ms < 0 here instead of in > BDRV_POLL_WHILE_TIMEOUT()? It would be more consistent to support -1 in > BDRV_POLL_WHILE_TIMEOUT() so that you don't need to remember which > functions/macros support timeout_ms=-1 and which dont. Previously, BDRV_POLL_WHILE_TIMEOUT() was not done very well, aio_poll() exits frequently because interval is used in the timer. but now I will support -1 in BDRV_POLL_WHILE_TIMEOUT(). > > > } > > + return 0; > > +} > > + > > +static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent, > > + bool poll) > > +{ > > + bdrv_do_drained_begin_timeout(bs, parent, poll, -1); > > } > > > > void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, BdrvChild *parent) > > @@ -390,6 +419,12 @@ bdrv_drained_begin(BlockDriverState *bs) > > IO_OR_GS_CODE(); > > bdrv_do_drained_begin(bs, NULL, true); > > } > > +int coroutine_mixed_fn > > +bdrv_drained_begin_timeout(BlockDriverState *bs, int64_t timeout_ms) > > +{ > > + IO_OR_GS_CODE(); > > + return bdrv_do_drained_begin_timeout(bs, NULL, true, timeout_ms); > > +} > > > > /** > > * This function does not poll, nor must any of its recursively called > > diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h > > index cf5e8bde1c..efbcb9777a 100644 > > --- a/include/block/aio-wait.h > > +++ b/include/block/aio-wait.h > > @@ -28,6 +28,8 @@ > > #include "block/aio.h" > > #include "qemu/main-loop.h" > > > > +#define AIO_WAIT_INTERVAL 10 /* ms */ > > + > > /** > > * AioWait: > > * > > @@ -56,6 +58,11 @@ typedef struct { > > unsigned num_waiters; > > } AioWait; > > > > +typedef struct { > > + struct QEMUTimer *timer; > > struct is not necessary since QEMUTimer is a typedef: > > QEMUTimer *timer; > > Also, can this be a struct field instead of a pointer by using > aio_timer_init_ms() instead of aio_timer_new()? Ok, I'm going to modify it in patch V2. > > > + int64_t interval; > > +} AioWaitTimer; > > + > > extern AioWait global_aio_wait; > > > > /** > > @@ -99,6 +106,55 @@ extern AioWait global_aio_wait; > > qatomic_dec(&wait_->num_waiters); \ > > waited_; }) > > > > +/** > > + * AIO_WAIT_WHILE_TIMEOUT: > > + * > > + * Refer to the implementation of AIO_WAIT_WHILE_INTERNAL, > > + * the timeout parameter is added. > > + */ > > +#define AIO_WAIT_WHILE_TIMEOUT(ctx, cond, timeout) ({ \ > > + int ret_ = 0; \ > > + AioWait *wait_ = &global_aio_wait; \ > > + AioContext *ctx_ = (ctx); \ > > + int64_t start_ = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); \ > > + int64_t deadline_ = start_ + (timeout); \ > > + /* Ensure that the aio_poll exits periodically to check timeout. */ \ > > + AioWaitTimer *s_ = g_malloc0(sizeof(AioWaitTimer)); \ > > Can this be allocated on the stack instead of the heap? Yes, it's certainly better. > > > + s_->interval = AIO_WAIT_INTERVAL; \ > > + /* Increment wait_->num_waiters before evaluating cond. */ \ > > + qatomic_inc(&wait_->num_waiters); \ > > + /* Paired with smp_mb in aio_wait_kick(). */ \ > > + smp_mb__after_rmw(); \ > > + if (ctx_ && in_aio_context_home_thread(ctx_)) { \ > > + s_->timer = aio_timer_new(ctx_, QEMU_CLOCK_REALTIME, \ > > + SCALE_MS, aio_wait_timer_retry, s_); \ > > + aio_wait_timer_retry(s_); \ > > + while ((cond)) { \ > > + aio_poll(ctx_, true); \ > > + if (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) > deadline_) { \ > > + ret_ = -ETIMEDOUT; \ > > + break; \ > > + } \ > > + } \ > > What is the purpose of interval? > > I expected the timer's callback function to be an empty function that is > called when the deadline expires. The while loop here would use > timer_pending() to check for expiry instead of explicitly checking > against the deadline. This was really a good idea; it resolved some of my doubts and made everything look better. > > > + } else { \ > > + s_->timer = aio_timer_new(qemu_get_aio_context(), \ > > + QEMU_CLOCK_REALTIME, SCALE_MS, aio_wait_timer_retry, s_); \ > > + aio_wait_timer_retry(s_); \ > > + while ((cond)) { \ > > + assert(qemu_get_current_aio_context() == \ > > + qemu_get_aio_context()); \ > > + aio_poll(qemu_get_aio_context(), true); \ > > + if (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) > deadline_) { \ > > + ret_ = -ETIMEDOUT; \ > > + break; \ > > + } \ > > + } \ > > + } \ > > + qatomic_dec(&wait_->num_waiters); \ > > + timer_free(s_->timer); \ > > This will need to be timer_del() when the QEMUTimer is moved onto the > stack. > > > + g_free(s_); \ > > + ret_; }) > > + > > #define AIO_WAIT_WHILE(ctx, cond) \ > > AIO_WAIT_WHILE_INTERNAL(ctx, cond) > > > > @@ -149,4 +205,6 @@ static inline bool > > in_aio_context_home_thread(AioContext *ctx) > > } > > } > > > > +void aio_wait_timer_retry(void *opaque); > > + > > #endif /* QEMU_AIO_WAIT_H */ > > diff --git a/include/block/block-io.h b/include/block/block-io.h > > index b49e0537dd..84f92d2b09 100644 > > --- a/include/block/block-io.h > > +++ b/include/block/block-io.h > > @@ -354,6 +354,11 @@ bdrv_co_copy_range(BdrvChild *src, int64_t src_offset, > > AIO_WAIT_WHILE(bdrv_get_aio_context(bs_), \ > > cond); }) > > > > +#define BDRV_POLL_WHILE_TIMEOUT(bs, cond, timeout) ({ \ > > + BlockDriverState *bs_ = (bs); \ > > + AIO_WAIT_WHILE_TIMEOUT(bdrv_get_aio_context(bs_), \ > > + cond, timeout); }) > > + > > void bdrv_drain(BlockDriverState *bs); > > > > int co_wrapper_mixed_bdrv_rdlock > > @@ -431,6 +436,8 @@ bdrv_drain_poll(BlockDriverState *bs, BdrvChild > > *ignore_parent, > > */ > > void bdrv_drained_begin(BlockDriverState *bs); > > > > +int bdrv_drained_begin_timeout(BlockDriverState *bs, int64_t timeout_ms); > > + > > /** > > * bdrv_do_drained_begin_quiesce: > > * > > diff --git a/util/aio-wait.c b/util/aio-wait.c > > index b5336cf5fd..9aed165529 100644 > > --- a/util/aio-wait.c > > +++ b/util/aio-wait.c > > @@ -84,3 +84,10 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc > > *cb, void *opaque) > > aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data); > > AIO_WAIT_WHILE_UNLOCKED(NULL, !data.done); > > } > > + > > +void aio_wait_timer_retry(void *opaque) > > +{ > > + AioWaitTimer *s = opaque; > > + > > + timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + > > s->interval); > > +} > > -- > > 2.33.0 > >