Signed-off-by: Fam Zheng <f...@redhat.com> --- include/block/aio.h | 27 +++++++++++++++++--- util/async.c | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/include/block/aio.h b/include/block/aio.h index e9aeeaec94..40c2f64544 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -47,6 +47,15 @@ typedef void QEMUBHFunc(void *opaque); typedef bool AioPollFn(void *opaque); typedef void IOHandler(void *opaque); +typedef void AioDrainFn(void *opaque); +typedef struct AioDrainOps { + AioDrainFn *drained_begin; + AioDrainFn *drained_end; + void *opaque; + bool is_new; + QTAILQ_ENTRY(AioDrainOps) next; +} AioDrainOps; + struct Coroutine; struct ThreadPool; struct LinuxAioState; @@ -147,6 +156,9 @@ struct AioContext { int epollfd; bool epoll_enabled; bool epoll_available; + + QTAILQ_HEAD(, AioDrainOps) drain_ops; + bool drain_ops_updated; }; /** @@ -441,9 +453,9 @@ int64_t aio_compute_timeout(AioContext *ctx); * * Disable the further processing of external clients. */ -static inline void aio_disable_external(AioContext *ctx) +static inline bool aio_disable_external(AioContext *ctx) { - atomic_inc(&ctx->external_disable_cnt); + return atomic_fetch_inc(&ctx->external_disable_cnt) == 0; } /** @@ -452,7 +464,7 @@ static inline void aio_disable_external(AioContext *ctx) * * Enable the processing of external clients. */ -static inline void aio_enable_external(AioContext *ctx) +static inline bool aio_enable_external(AioContext *ctx) { int old; @@ -462,6 +474,7 @@ static inline void aio_enable_external(AioContext *ctx) /* Kick event loop so it re-arms file descriptors */ aio_notify(ctx); } + return old == 1; } /** @@ -564,4 +577,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, int64_t grow, int64_t shrink, Error **errp); +void aio_context_drained_begin(AioContext *ctx); +void aio_context_drained_end(AioContext *ctx); + +void aio_context_add_drain_ops(AioContext *ctx, + AioDrainFn *begin, AioDrainFn *end, void *opaque); +void aio_context_del_drain_ops(AioContext *ctx, + AioDrainFn *begin, AioDrainFn *end, void *opaque); + #endif diff --git a/util/async.c b/util/async.c index 4dd9d95a9e..cca0efd263 100644 --- a/util/async.c +++ b/util/async.c @@ -402,6 +402,7 @@ AioContext *aio_context_new(Error **errp) AioContext *ctx; ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + QTAILQ_INIT(&ctx->drain_ops); aio_context_setup(ctx); ret = event_notifier_init(&ctx->notifier, false); @@ -506,3 +507,75 @@ void aio_context_release(AioContext *ctx) { qemu_rec_mutex_unlock(&ctx->lock); } + +/* Called with ctx->lock */ +void aio_context_drained_begin(AioContext *ctx) +{ + AioDrainOps *ops; + + /* TODO: When all external fds are handled in the following drain_ops + * callbacks, aio_disable_external can be dropped. */ + aio_disable_external(ctx); +restart: + ctx->drain_ops_updated = false; + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) { + ops->drained_begin(ops->opaque); + if (ctx->drain_ops_updated) { + goto restart; + } + } +} + +/* Called with ctx->lock */ +void aio_context_drained_end(AioContext *ctx) +{ + AioDrainOps *ops; + +restart: + ctx->drain_ops_updated = false; + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) { + if (ops->is_new) { + continue; + } + ops->drained_end(ops->opaque); + if (ctx->drain_ops_updated) { + goto restart; + } + } + if (aio_enable_external(ctx)) { + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) { + ops->is_new = false; + } + } +} + +/* Called with ctx->lock */ +void aio_context_add_drain_ops(AioContext *ctx, + AioDrainFn *begin, AioDrainFn *end, void *opaque) +{ + AioDrainOps *ops = g_new0(AioDrainOps, 1); + ops->drained_begin = begin; + ops->drained_end = end; + ops->opaque = opaque; + ops->is_new = true; + QTAILQ_INSERT_TAIL(&ctx->drain_ops, ops, next); + ctx->drain_ops_updated = true; +} + +/* Called with ctx->lock */ +void aio_context_del_drain_ops(AioContext *ctx, + AioDrainFn *begin, AioDrainFn *end, void *opaque) +{ + AioDrainOps *ops; + + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) { + if (ops->drained_begin == begin && + ops->drained_end == end && + ops->opaque == opaque) { + QTAILQ_REMOVE(&ctx->drain_ops, ops, next); + ctx->drain_ops_updated = true; + g_free(ops); + return; + } + } +} -- 2.14.3