On Sat, Mar 08, 2025 at 06:16:17PM +0800, zoudongjie wrote:
> From: Zhu Yangyang <zhuyangyan...@huawei.com>
> 
> The bdrv_drained_begin() function is a blocking function. In scenarios where 
> network storage
> is used and network links fail, it may block for a long time.
> Therefore, we add a timeout parameter to control the duration of the block.
> 
> Since bdrv_drained_begin() has been widely adopted, both bdrv_drained_begin()
> and bdrv_drained_begin_timeout() will be retained.
> 
> Signed-off-by: Zhu Yangyang <zhuyangyan...@huawei.com>
> ---
>  block/io.c               | 55 ++++++++++++++++++++++++++++++-------
>  include/block/aio-wait.h | 58 ++++++++++++++++++++++++++++++++++++++++
>  include/block/block-io.h |  7 +++++
>  util/aio-wait.c          |  7 +++++
>  4 files changed, 117 insertions(+), 10 deletions(-)
> 
> diff --git a/block/io.c b/block/io.c
> index d369b994df..03b8b2dca7 100644
> --- a/block/io.c
> +++ b/block/io.c
> @@ -255,6 +255,8 @@ typedef struct {
>      bool begin;
>      bool poll;
>      BdrvChild *parent;
> +    int ret;
> +    int64_t timeout;

Please put the units (milliseconds) into the variable name here and
everywhere else in the patch to avoid confusion about units:

  int64_t timeout_ms;

>  } BdrvCoDrainData;
>  
>  /* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */
> @@ -283,6 +285,8 @@ static bool bdrv_drain_poll_top_level(BlockDriverState 
> *bs,
>      return bdrv_drain_poll(bs, ignore_parent, false);
>  }
>  
> +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs,
> +    BdrvChild *parent, bool poll, int64_t timeout);
>  static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
>                                    bool poll);
>  static void bdrv_do_drained_end(BlockDriverState *bs, BdrvChild *parent);
> @@ -296,7 +300,8 @@ static void bdrv_co_drain_bh_cb(void *opaque)
>      if (bs) {
>          bdrv_dec_in_flight(bs);
>          if (data->begin) {
> -            bdrv_do_drained_begin(bs, data->parent, data->poll);
> +            data->ret = bdrv_do_drained_begin_timeout(
> +                bs, data->parent, data->poll, data->timeout);
>          } else {
>              assert(!data->poll);
>              bdrv_do_drained_end(bs, data->parent);
> @@ -310,10 +315,11 @@ static void bdrv_co_drain_bh_cb(void *opaque)
>      aio_co_wake(co);
>  }
>  
> -static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
> -                                                bool begin,
> -                                                BdrvChild *parent,
> -                                                bool poll)
> +static int coroutine_fn bdrv_co_yield_to_drain_timeout(BlockDriverState *bs,
> +                                                         bool begin,
> +                                                         BdrvChild *parent,
> +                                                         bool poll,
> +                                                         int64_t timeout)
>  {
>      BdrvCoDrainData data;
>      Coroutine *self = qemu_coroutine_self();
> @@ -329,6 +335,8 @@ static void coroutine_fn 
> bdrv_co_yield_to_drain(BlockDriverState *bs,
>          .begin = begin,
>          .parent = parent,
>          .poll = poll,
> +        .timeout = timeout,
> +        .ret = 0
>      };
>  
>      if (bs) {
> @@ -342,16 +350,25 @@ static void coroutine_fn 
> bdrv_co_yield_to_drain(BlockDriverState *bs,
>      /* If we are resumed from some other event (such as an aio completion or 
> a
>       * timer callback), it is a bug in the caller that should be fixed. */
>      assert(data.done);
> +    return data.ret;
>  }
>  
> -static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
> -                                  bool poll)
> +static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs,
> +                                                bool begin,
> +                                                BdrvChild *parent,
> +                                                bool poll)
> +{
> +    bdrv_co_yield_to_drain_timeout(bs, begin, parent, poll, -1);

Is this safe on 32-bit platforms?

> +}
> +
> +static int bdrv_do_drained_begin_timeout(BlockDriverState *bs,
> +    BdrvChild *parent, bool poll, int64_t timeout_ms)
>  {
>      IO_OR_GS_CODE();
>  
>      if (qemu_in_coroutine()) {
> -        bdrv_co_yield_to_drain(bs, true, parent, poll);
> -        return;
> +        return bdrv_co_yield_to_drain_timeout(bs, true, parent, poll,
> +                                              timeout_ms);
>      }
>  
>      GLOBAL_STATE_CODE();
> @@ -375,8 +392,20 @@ static void bdrv_do_drained_begin(BlockDriverState *bs, 
> BdrvChild *parent,
>       * nodes.
>       */
>      if (poll) {
> -        BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent));
> +        if (timeout_ms < 0) {
> +            BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, parent));
> +        } else {
> +            return BDRV_POLL_WHILE_TIMEOUT(
> +                bs, bdrv_drain_poll_top_level(bs, parent), timeout_ms);
> +        }

Any reason to handle timeout_ms < 0 here instead of in
BDRV_POLL_WHILE_TIMEOUT()? It would be more consistent to support -1 in
BDRV_POLL_WHILE_TIMEOUT() so that you don't need to remember which
functions/macros support timeout_ms=-1 and which dont.

>      }
> +    return 0;
> +}
> +
> +static void bdrv_do_drained_begin(BlockDriverState *bs, BdrvChild *parent,
> +                                  bool poll)
> +{
> +    bdrv_do_drained_begin_timeout(bs, parent, poll, -1);
>  }
>  
>  void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, BdrvChild *parent)
> @@ -390,6 +419,12 @@ bdrv_drained_begin(BlockDriverState *bs)
>      IO_OR_GS_CODE();
>      bdrv_do_drained_begin(bs, NULL, true);
>  }
> +int coroutine_mixed_fn
> +bdrv_drained_begin_timeout(BlockDriverState *bs, int64_t timeout_ms)
> +{
> +    IO_OR_GS_CODE();
> +    return bdrv_do_drained_begin_timeout(bs, NULL, true, timeout_ms);
> +}
>  
>  /**
>   * This function does not poll, nor must any of its recursively called
> diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
> index cf5e8bde1c..efbcb9777a 100644
> --- a/include/block/aio-wait.h
> +++ b/include/block/aio-wait.h
> @@ -28,6 +28,8 @@
>  #include "block/aio.h"
>  #include "qemu/main-loop.h"
>  
> +#define AIO_WAIT_INTERVAL 10  /* ms */
> +
>  /**
>   * AioWait:
>   *
> @@ -56,6 +58,11 @@ typedef struct {
>      unsigned num_waiters;
>  } AioWait;
>  
> +typedef struct {
> +    struct QEMUTimer *timer;

struct is not necessary since QEMUTimer is a typedef:

  QEMUTimer *timer;

Also, can this be a struct field instead of a pointer by using
aio_timer_init_ms() instead of aio_timer_new()?

> +    int64_t interval;
> +} AioWaitTimer;
> +
>  extern AioWait global_aio_wait;
>  
>  /**
> @@ -99,6 +106,55 @@ extern AioWait global_aio_wait;
>      qatomic_dec(&wait_->num_waiters);                              \
>      waited_; })
>  
> +/**
> + * AIO_WAIT_WHILE_TIMEOUT:
> + *
> + * Refer to the implementation of AIO_WAIT_WHILE_INTERNAL,
> + * the timeout parameter is added.
> + */
> +#define AIO_WAIT_WHILE_TIMEOUT(ctx, cond, timeout) ({                    \
> +    int ret_ = 0;                                                        \
> +    AioWait *wait_ = &global_aio_wait;                                   \
> +    AioContext *ctx_ = (ctx);                                            \
> +    int64_t start_ = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);             \
> +    int64_t deadline_ = start_ + (timeout);                              \
> +    /* Ensure that the aio_poll exits periodically to check timeout. */  \
> +    AioWaitTimer *s_ = g_malloc0(sizeof(AioWaitTimer));                  \

Can this be allocated on the stack instead of the heap?

> +    s_->interval = AIO_WAIT_INTERVAL;                                    \
> +    /* Increment wait_->num_waiters before evaluating cond. */           \
> +    qatomic_inc(&wait_->num_waiters);                                    \
> +    /* Paired with smp_mb in aio_wait_kick(). */                         \
> +    smp_mb__after_rmw();                                                 \
> +    if (ctx_ && in_aio_context_home_thread(ctx_)) {                      \
> +        s_->timer = aio_timer_new(ctx_, QEMU_CLOCK_REALTIME,             \
> +                        SCALE_MS, aio_wait_timer_retry, s_);             \
> +        aio_wait_timer_retry(s_);                                        \
> +        while ((cond)) {                                                 \
> +            aio_poll(ctx_, true);                                        \
> +            if (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) > deadline_) {    \
> +                ret_ = -ETIMEDOUT;                                       \
> +                break;                                                   \
> +            }                                                            \
> +        }                                                                \

What is the purpose of interval?

I expected the timer's callback function to be an empty function that is
called when the deadline expires. The while loop here would use
timer_pending() to check for expiry instead of explicitly checking
against the deadline.

> +    } else {                                                             \
> +        s_->timer = aio_timer_new(qemu_get_aio_context(),                \
> +            QEMU_CLOCK_REALTIME, SCALE_MS, aio_wait_timer_retry, s_);    \
> +        aio_wait_timer_retry(s_);                                        \
> +        while ((cond)) {                                                 \
> +            assert(qemu_get_current_aio_context() ==                     \
> +                   qemu_get_aio_context());                              \
> +            aio_poll(qemu_get_aio_context(), true);                      \
> +            if (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) > deadline_) {    \
> +                ret_ = -ETIMEDOUT;                                       \
> +                break;                                                   \
> +            }                                                            \
> +        }                                                                \
> +    }                                                                    \
> +    qatomic_dec(&wait_->num_waiters);                                    \
> +    timer_free(s_->timer);                                               \

This will need to be timer_del() when the QEMUTimer is moved onto the
stack.

> +    g_free(s_);                                                          \
> +    ret_; })
> +
>  #define AIO_WAIT_WHILE(ctx, cond)                                  \
>      AIO_WAIT_WHILE_INTERNAL(ctx, cond)
>  
> @@ -149,4 +205,6 @@ static inline bool in_aio_context_home_thread(AioContext 
> *ctx)
>      }
>  }
>  
> +void aio_wait_timer_retry(void *opaque);
> +
>  #endif /* QEMU_AIO_WAIT_H */
> diff --git a/include/block/block-io.h b/include/block/block-io.h
> index b49e0537dd..84f92d2b09 100644
> --- a/include/block/block-io.h
> +++ b/include/block/block-io.h
> @@ -354,6 +354,11 @@ bdrv_co_copy_range(BdrvChild *src, int64_t src_offset,
>      AIO_WAIT_WHILE(bdrv_get_aio_context(bs_),              \
>                     cond); })
>  
> +#define BDRV_POLL_WHILE_TIMEOUT(bs, cond, timeout) ({      \
> +    BlockDriverState *bs_ = (bs);                          \
> +    AIO_WAIT_WHILE_TIMEOUT(bdrv_get_aio_context(bs_),      \
> +                           cond, timeout); })
> +
>  void bdrv_drain(BlockDriverState *bs);
>  
>  int co_wrapper_mixed_bdrv_rdlock
> @@ -431,6 +436,8 @@ bdrv_drain_poll(BlockDriverState *bs, BdrvChild 
> *ignore_parent,
>   */
>  void bdrv_drained_begin(BlockDriverState *bs);
>  
> +int bdrv_drained_begin_timeout(BlockDriverState *bs, int64_t timeout_ms);
> +
>  /**
>   * bdrv_do_drained_begin_quiesce:
>   *
> diff --git a/util/aio-wait.c b/util/aio-wait.c
> index b5336cf5fd..9aed165529 100644
> --- a/util/aio-wait.c
> +++ b/util/aio-wait.c
> @@ -84,3 +84,10 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, 
> void *opaque)
>      aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data);
>      AIO_WAIT_WHILE_UNLOCKED(NULL, !data.done);
>  }
> +
> +void aio_wait_timer_retry(void *opaque)
> +{
> +    AioWaitTimer *s = opaque;
> +
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + 
> s->interval);
> +}
> -- 
> 2.33.0
> 

Attachment: signature.asc
Description: PGP signature

Reply via email to