Introduce the aio_add_sqe() API for submitting io_uring requests in the current AioContext. This allows other components in QEMU, like the block layer, to take advantage of io_uring features without creating their own io_uring context.
This API supports nested event loops just like file descriptor monitoring and BHs do. This comes at a complexity cost: a BH is required to dispatch CQE callbacks and they are placed on a list so that a nested event loop can invoke its parent's pending CQE callbacks. If you're wondering why CqeHandler exists instead of just a callback function pointer, this is why. Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> --- include/block/aio.h | 67 +++++++++++++++++++ util/aio-posix.h | 1 + util/aio-posix.c | 9 +++ util/fdmon-io_uring.c | 145 +++++++++++++++++++++++++++++++----------- 4 files changed, 185 insertions(+), 37 deletions(-) diff --git a/include/block/aio.h b/include/block/aio.h index 1657740a0e..4dfb419a21 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -61,6 +61,27 @@ typedef struct LuringState LuringState; /* Is polling disabled? */ bool aio_poll_disabled(AioContext *ctx); +#ifdef CONFIG_LINUX_IO_URING +/* + * Each io_uring request must have a unique CqeHandler that processes the cqe. + * The lifetime of a CqeHandler must be at least from aio_add_sqe() until + * ->cb() invocation. + */ +typedef struct CqeHandler CqeHandler; +struct CqeHandler { + /* Called by the AioContext when the request has completed */ + void (*cb)(CqeHandler *handler); + + /* Used internally, do not access this */ + QSIMPLEQ_ENTRY(CqeHandler) next; + + /* This field is filled in before ->cb() is called */ + struct io_uring_cqe cqe; +}; + +typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ; +#endif /* CONFIG_LINUX_IO_URING */ + /* Callbacks for file descriptor monitoring implementations */ typedef struct { /* @@ -138,6 +159,27 @@ typedef struct { * Called with list_lock incremented. */ void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list); + +#ifdef CONFIG_LINUX_IO_URING + /** + * aio_add_sqe: Add an io_uring sqe for submission. + * @prep_sqe: invoked with an sqe that should be prepared for submission + * @opaque: user-defined argument to @prep_sqe() + * @cqe_handler: the unique cqe handler associated with this request + * + * The caller's @prep_sqe() function is invoked to fill in the details of + * the sqe. Do not call io_uring_sqe_set_data() on this sqe. + * + * The kernel may see the sqe as soon as @pre_sqe() returns or it may take + * until the next event loop iteration. + * + * This function is called from the current AioContext and is not + * thread-safe. + */ + void (*add_sqe)(AioContext *ctx, + void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), + void *opaque, CqeHandler *cqe_handler); +#endif /* CONFIG_LINUX_IO_URING */ } FDMonOps; /* @@ -255,6 +297,10 @@ struct AioContext { struct io_uring fdmon_io_uring; AioHandlerSList submit_list; gpointer io_uring_fd_tag; + + /* Pending callback state for cqe handlers */ + CqeHandlerSimpleQ cqe_handler_ready_list; + QEMUBH *cqe_handler_bh; #endif /* TimerLists for calling timers - one per clock type. Has its own @@ -370,6 +416,27 @@ QEMUBH *aio_bh_new_full(AioContext *ctx, QEMUBHFunc *cb, void *opaque, #define aio_bh_new_guarded(ctx, cb, opaque, guard) \ aio_bh_new_full((ctx), (cb), (opaque), (stringify(cb)), guard) +#ifdef CONFIG_LINUX_IO_URING +/** + * aio_add_sqe: Add an io_uring sqe for submission. + * @prep_sqe: invoked with an sqe that should be prepared for submission + * @opaque: user-defined argument to @prep_sqe() + * @cqe_handler: the unique cqe handler associated with this request + * + * The caller's @prep_sqe() function is invoked to fill in the details of the + * sqe. Do not call io_uring_sqe_set_data() on this sqe. + * + * The sqe is submitted by the current AioContext. The kernel may see the sqe + * as soon as @pre_sqe() returns or it may take until the next event loop + * iteration. + * + * When the AioContext is destroyed, pending sqes are ignored and their + * CqeHandlers are not invoked. + */ +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), + void *opaque, CqeHandler *cqe_handler); +#endif /* CONFIG_LINUX_IO_URING */ + /** * aio_notify: Force processing of pending events. * diff --git a/util/aio-posix.h b/util/aio-posix.h index f9994ed79e..d3e2f66957 100644 --- a/util/aio-posix.h +++ b/util/aio-posix.h @@ -35,6 +35,7 @@ struct AioHandler { #ifdef CONFIG_LINUX_IO_URING QSLIST_ENTRY(AioHandler) node_submitted; unsigned flags; /* see fdmon-io_uring.c */ + CqeHandler cqe_handler; #endif int64_t poll_idle_timeout; /* when to stop userspace polling */ bool poll_ready; /* has polling detected an event? */ diff --git a/util/aio-posix.c b/util/aio-posix.c index 6c2ee0b0b4..f2535dc868 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -767,3 +767,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch) aio_notify(ctx); } + +#ifdef CONFIG_LINUX_IO_URING +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), + void *opaque, CqeHandler *cqe_handler) +{ + AioContext *ctx = qemu_get_current_aio_context(); + ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler); +} +#endif /* CONFIG_LINUX_IO_URING */ diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c index 18b33a0370..a4523e3dcc 100644 --- a/util/fdmon-io_uring.c +++ b/util/fdmon-io_uring.c @@ -75,8 +75,8 @@ static inline int pfd_events_from_poll(int poll_events) } /* - * Returns an sqe for submitting a request. Only be called within - * fdmon_io_uring_wait(). + * Returns an sqe for submitting a request. Only called from the AioContext + * thread. */ static struct io_uring_sqe *get_sqe(AioContext *ctx) { @@ -166,23 +166,43 @@ static void fdmon_io_uring_update(AioContext *ctx, } } +static void fdmon_io_uring_add_sqe(AioContext *ctx, + void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), + void *opaque, CqeHandler *cqe_handler) +{ + struct io_uring_sqe *sqe = get_sqe(ctx); + + prep_sqe(sqe, opaque); + io_uring_sqe_set_data(sqe, cqe_handler); +} + +static void fdmon_special_cqe_handler(CqeHandler *cqe_handler) +{ + /* + * This is an empty function that is never called. It is used as a function + * pointer to distinguish it from ordinary cqe handlers. + */ +} + static void add_poll_multishot_sqe(AioContext *ctx, AioHandler *node) { struct io_uring_sqe *sqe = get_sqe(ctx); int events = poll_events_from_pfd(node->pfd.events); io_uring_prep_poll_multishot(sqe, node->pfd.fd, events); - io_uring_sqe_set_data(sqe, node); + node->cqe_handler.cb = fdmon_special_cqe_handler; + io_uring_sqe_set_data(sqe, &node->cqe_handler); } static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) { struct io_uring_sqe *sqe = get_sqe(ctx); + CqeHandler *cqe_handler = &node->cqe_handler; #ifdef LIBURING_HAVE_DATA64 - io_uring_prep_poll_remove(sqe, (uintptr_t)node); + io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler); #else - io_uring_prep_poll_remove(sqe, node); + io_uring_prep_poll_remove(sqe, cqe_handler); #endif io_uring_sqe_set_data(sqe, NULL); } @@ -221,20 +241,12 @@ static void fill_sq_ring(AioContext *ctx) } } -/* Returns true if a handler became ready */ -static bool process_cqe(AioContext *ctx, - AioHandlerList *ready_list, - struct io_uring_cqe *cqe) +static bool process_cqe_aio_handler(AioContext *ctx, + AioHandlerList *ready_list, + AioHandler *node, + struct io_uring_cqe *cqe) { - AioHandler *node = io_uring_cqe_get_data(cqe); - unsigned flags; - - /* poll_timeout and poll_remove have a zero user_data field */ - if (!node) { - return false; - } - - flags = qatomic_read(&node->flags); + unsigned flags = qatomic_read(&node->flags); /* * poll_multishot cancelled by poll_remove? Or completed early because fd @@ -261,6 +273,56 @@ static bool process_cqe(AioContext *ctx, return true; } +/* Process CqeHandlers from the ready list */ +static void cqe_handler_bh(void *opaque) +{ + AioContext *ctx = opaque; + CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list; + + /* + * If cqe_handler->cb() calls aio_poll() it must continue processing + * ready_list. Schedule a BH so the inner event loop calls us again. + */ + qemu_bh_schedule(ctx->cqe_handler_bh); + + while (!QSIMPLEQ_EMPTY(ready_list)) { + CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list); + + QSIMPLEQ_REMOVE_HEAD(ready_list, next); + + cqe_handler->cb(cqe_handler); + } + + qemu_bh_cancel(ctx->cqe_handler_bh); +} + +/* Returns true if a handler became ready */ +static bool process_cqe(AioContext *ctx, + AioHandlerList *ready_list, + struct io_uring_cqe *cqe) +{ + CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe); + + /* poll_timeout and poll_remove have a zero user_data field */ + if (!cqe_handler) { + return false; + } + + /* + * Special handling for AioHandler cqes. They need ready_list and have a + * return value. + */ + if (cqe_handler->cb == fdmon_special_cqe_handler) { + AioHandler *node = container_of(cqe_handler, AioHandler, cqe_handler); + return process_cqe_aio_handler(ctx, ready_list, node, cqe); + } + + cqe_handler->cqe = *cqe; + QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next); + qemu_bh_schedule(ctx->cqe_handler_bh); + return false; +} + static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) { struct io_uring *ring = &ctx->fdmon_io_uring; @@ -360,6 +422,7 @@ static const FDMonOps fdmon_io_uring_ops = { .gsource_prepare = fdmon_io_uring_gsource_prepare, .gsource_check = fdmon_io_uring_gsource_check, .gsource_dispatch = fdmon_io_uring_gsource_dispatch, + .add_sqe = fdmon_io_uring_add_sqe, }; bool fdmon_io_uring_setup(AioContext *ctx) @@ -375,6 +438,8 @@ bool fdmon_io_uring_setup(AioContext *ctx) } QSLIST_INIT(&ctx->submit_list); + QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list); + ctx->cqe_handler_bh = aio_bh_new(ctx, cqe_handler_bh, ctx); ctx->fdmon_ops = &fdmon_io_uring_ops; ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source, ctx->fdmon_io_uring.ring_fd, G_IO_IN); @@ -384,30 +449,36 @@ bool fdmon_io_uring_setup(AioContext *ctx) void fdmon_io_uring_destroy(AioContext *ctx) { - if (ctx->fdmon_ops == &fdmon_io_uring_ops) { - AioHandler *node; + AioHandler *node; - io_uring_queue_exit(&ctx->fdmon_io_uring); + if (ctx->fdmon_ops != &fdmon_io_uring_ops) { + return; + } - /* Move handlers due to be removed onto the deleted list */ - while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { - unsigned flags = qatomic_fetch_and(&node->flags, - ~(FDMON_IO_URING_PENDING | - FDMON_IO_URING_ADD | - FDMON_IO_URING_REMOVE)); + io_uring_queue_exit(&ctx->fdmon_io_uring); - if (flags & FDMON_IO_URING_REMOVE) { - QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); - } + /* Move handlers due to be removed onto the deleted list */ + while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { + unsigned flags = qatomic_fetch_and(&node->flags, + ~(FDMON_IO_URING_PENDING | + FDMON_IO_URING_ADD | + FDMON_IO_URING_REMOVE)); - QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); + if (flags & FDMON_IO_URING_REMOVE) { + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, + node, node_deleted); } - g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag); - ctx->io_uring_fd_tag = NULL; - - qemu_lockcnt_lock(&ctx->list_lock); - fdmon_poll_downgrade(ctx); - qemu_lockcnt_unlock(&ctx->list_lock); + QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); } + + g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag); + ctx->io_uring_fd_tag = NULL; + + assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list)); + qemu_bh_delete(ctx->cqe_handler_bh); + + qemu_lockcnt_lock(&ctx->list_lock); + fdmon_poll_downgrade(ctx); + qemu_lockcnt_unlock(&ctx->list_lock); } -- 2.49.0