On 07/01/2025 18:11, Andres Freund wrote:
The difference between a handle and a reference is useful right now, to have
some separation between the functions that can be called by anyone (taking a
PgAioHandleRef) and only by the issuer (PgAioHandle). That might better be
solved by having a PgAioHandleIssuerRef ref or something.

Having PgAioReturn be separate from the AIO handle turns out to be rather
crucial, otherwise it's very hard to guarantee "forward progress",
i.e. guarantee that pgaio_io_get() will return something without blocking
forever.

Right, yeah, I can see that.

typedef enum PgAioHandleState
{
        /* not in use */
        AHS_IDLE = 0,

        /* returned by pgaio_io_get() */
        AHS_HANDED_OUT,

        /* pgaio_io_start_*() has been called, but IO hasn't been submitted yet 
*/
        AHS_DEFINED,

        /* subject's prepare() callback has been called */
        AHS_PREPARED,

        /* IO has been submitted and is being executed */
        AHS_IN_FLIGHT,

        /* IO finished, but result has not yet been processed */
        AHS_REAPED,

        /* IO completed, shared completion has been called */
        AHS_COMPLETED_SHARED,

        /* IO completed, local completion has been called */
        AHS_COMPLETED_LOCAL,
} PgAioHandleState;

Do we need to distinguish between DEFINED and PREPARED?

I found it to be rather confusing if it's not possible to tell if some action
(like the prepare callback) has already happened, or not.  It's useful to be
able look at an IO in a backtrace or such and see exactly in what state it is
in.

I see.

In v1 I had several of the above states managed as separate boolean variables
- that turned out to be a huge mess, it's a lot easier to understand if
there's a single strictly monotonically increasing state.

Agreed on that

I didn't quite understand the point of the prepare callbacks. For example,
when AsyncReadBuffers() calls smgrstartreadv(), the
shared_buffer_readv_prepare() callback will be called. Why doesn't
AsyncReadBuffers() do the "prepare" work itself directly; why does it need
to be in a callback?

One big part of it is "ownership" - while the IO isn't completely "assembled",
we can release all buffer pins etc in case of an error. But if the error
happens just after the IO was staged, we can't - the buffer is still
referenced by the IO. For that the AIO subystem needs to take its own pins
etc.  Initially the prepare callback didn't exist, the code in
AsyncReadBuffers() was a lot more complicated before it.


I assume it's somehow related to error handling, but I didn't quite get
it. Perhaps an "abort" callback that'd be called on error, instead of a
"prepare" callback, would be better?

I don't think an error callback would be helpful - the whole thing is that we
basically need claim ownership of all IO related resources IFF the IO is
staged. Not before (because then the IO not getting staged would mean we have
a resource leak), not after (because we might error out and thus not keep
e.g. buffers pinned).

Hmm. The comments say that when you call smgrstartreadv(), the IO handle may no longer be modified, as the IO may be executed immediately. What if we changed that so that it never submits the IO, only adds the necessary callbacks to it?

In that world, when smgrstartreadv() returns, the necessary details and completion callbacks have been set in the IO handle, but the caller can still do more preparation before the IO is submitted. The caller must ensure that it gets submitted, however, so no erroring out in that state.

Currently the call stack looks like this:

AsyncReadBuffers()
-> smgrstartreadv()
  -> mdstartreadv()
    -> FileStartReadV()
      -> pgaio_io_prep_readv()
        -> shared_buffer_readv_prepare() (callback)
        <- (return)
      <- (return)
    <- (return)
  <- (return)
<- (return)

I'm thinking that the prepare work is done "on the way up" instead:

AsyncReadBuffers()
-> smgrstartreadv()
  -> mdstartreadv()
    -> FileStartReadV()
      -> pgaio_io_prep_readv()
      <- (return)
    <- (return)
  <- (return)
-> shared_buffer_readv_prepare()
<- (return)

Attached is a patch to demonstrate concretely what I mean.

This adds a new pgaio_io_stage() step to the issuer, and the issuer needs to call the prepare functions explicitly, instead of having them as callbacks. Nominally that's more steps, but IMHO it's better to be explicit. The same actions were happening previously too, it was just hidden in the callback. I updated the README to show that too.

I'm not wedded to this, but it feels a little better to me.

--
Heikki Linnakangas
Neon (https://neon.tech)
diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md
index 0076ea4aa10..25b5f5d9529 100644
--- a/src/backend/storage/aio/README.md
+++ b/src/backend/storage/aio/README.md
@@ -60,7 +60,18 @@ smgrstartreadv(ioh, operation->smgr, forknum, blkno,
                BufferGetBlock(buffer), 1);
 
 /*
- * As mentioned above, the IO might be initiated within smgrstartreadv(). That
+ * After smgrstartreadv() has returned, we are committed to performing the IO.
+ * We may do more preparation or add more callbacks to the IO, but must
+ * *not* error out before calling pgaio_io_stage(). We don't have any such
+ * preparation to do here, so just call pgaio_io_stage() to indicate that we
+ * have completed building the IO request. It usually queues up the request
+ * for batching, but may submit it immediately if the batch is full or if
+ * the request needed to be processed synchronously.
+ */
+pgaio_io_stage(ioh);
+
+/*
+ * The IO might already have been initiated by pgaio_io_stage(). That
  * is however not guaranteed, to allow IO submission to be batched.
  *
  * Note that one needs to be careful while there may be unsubmitted IOs, as
@@ -69,10 +80,6 @@ smgrstartreadv(ioh, operation->smgr, forknum, blkno,
  * that, pending IOs need to be explicitly submitted before this backend
  * might be blocked by a backend waiting for IO.
  *
- * Note that the IO might have immediately been submitted (e.g. due to reaching
- * a limit on the number of unsubmitted IOs) and even completed during the
- * smgrstartreadv() above.
- *
  * Once submitted, the IO is in-flight and can complete at any time.
  */
 pgaio_submit_staged();
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 261a752fb80..ed03fe03609 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -110,7 +110,7 @@ static PgAioHandle *inj_cur_handle;
  * Acquire an AioHandle, waiting for IO completion if necessary.
  *
  * Each backend can only have one AIO handle that that has been "handed out"
- * to code, but not yet submitted or released. This restriction is necessary
+ * to code, but not yet staged or released. This restriction is necessary
  * to ensure that it is possible for code to wait for an unused handle by
  * waiting for in-flight IO to complete. There is a limited number of handles
  * in each backend, if multiple handles could be handed out without being
@@ -249,6 +249,43 @@ pgaio_io_release(PgAioHandle *ioh)
 	}
 }
 
+/*
+ * Finish building an IO request.  Once a request has been staged, there's no
+ * going back; the IO subsystem will attempt to perform the IO. If the IO
+ * succeeds the completion callbacks will be called; on error, the error
+ * callbacks.
+ *
+ * This may add the IO to the current batch, or execute the request
+ * synchronously.
+ */
+void
+pgaio_io_stage(PgAioHandle *ioh)
+{
+	bool		needs_synchronous;
+
+	/* allow a new IO to be staged */
+	my_aio->handed_out_io = NULL;
+
+	pgaio_io_update_state(ioh, AHS_PREPARED);
+
+	needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
+
+	elog(DEBUG3, "io:%d: staged %s, executed synchronously: %d",
+		 pgaio_io_get_id(ioh), pgaio_io_get_op_name(ioh),
+		 needs_synchronous);
+
+	if (!needs_synchronous)
+	{
+		my_aio->staged_ios[my_aio->num_staged_ios++] = ioh;
+		Assert(my_aio->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
+	}
+	else
+	{
+		pgaio_io_prepare_submit(ioh);
+		pgaio_io_perform_synchronously(ioh);
+	}
+}
+
 /*
  * Release IO handle during resource owner cleanup.
  */
@@ -279,7 +316,7 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
 
 			pgaio_io_reclaim(ioh);
 			break;
-		case AHS_DEFINED:
+		case AHS_PREPARING:
 		case AHS_PREPARED:
 			/* XXX: Should we warn about this when is_commit? */
 			pgaio_submit_staged();
@@ -383,7 +420,7 @@ void
 pgaio_io_get_ref(PgAioHandle *ioh, PgAioHandleRef *ior)
 {
 	Assert(ioh->state == AHS_HANDED_OUT ||
-		   ioh->state == AHS_DEFINED ||
+		   ioh->state == AHS_PREPARING ||
 		   ioh->state == AHS_PREPARED);
 	Assert(ioh->generation != 0);
 
@@ -437,7 +474,7 @@ pgaio_io_ref_wait(PgAioHandleRef *ior)
 
 	if (am_owner)
 	{
-		if (state == AHS_DEFINED || state == AHS_PREPARED)
+		if (state == AHS_PREPARING || state == AHS_PREPARED)
 		{
 			/* XXX: Arguably this should be prevented by callers? */
 			pgaio_submit_staged();
@@ -489,8 +526,8 @@ pgaio_io_ref_wait(PgAioHandleRef *ior)
 				/* fallthrough */
 
 				/* waiting for owner to submit */
+			case AHS_PREPARING:
 			case AHS_PREPARED:
-			case AHS_DEFINED:
 				/* waiting for reaper to complete */
 				/* fallthrough */
 			case AHS_REAPED:
@@ -501,8 +538,7 @@ pgaio_io_ref_wait(PgAioHandleRef *ior)
 
 				while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
 				{
-					if (state != AHS_REAPED && state != AHS_DEFINED &&
-						state != AHS_IN_FLIGHT)
+					if (state != AHS_REAPED && state != AHS_IN_FLIGHT)
 						break;
 					ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_COMPLETION);
 				}
@@ -570,8 +606,8 @@ pgaio_io_get_state_name(PgAioHandle *ioh)
 			return "idle";
 		case AHS_HANDED_OUT:
 			return "handed_out";
-		case AHS_DEFINED:
-			return "DEFINED";
+		case AHS_PREPARING:
+			return "PREPARING";
 		case AHS_PREPARED:
 			return "PREPARED";
 		case AHS_IN_FLIGHT:
@@ -588,43 +624,18 @@ pgaio_io_get_state_name(PgAioHandle *ioh)
 
 /*
  * Internal, should only be called from pgaio_io_prep_*().
+ *
+ * Switches the IO to PREPARING state.
  */
 void
-pgaio_io_prepare(PgAioHandle *ioh, PgAioOp op)
+pgaio_io_start_staging(PgAioHandle *ioh)
 {
-	bool		needs_synchronous;
-
 	Assert(ioh->state == AHS_HANDED_OUT);
 	Assert(pgaio_io_has_subject(ioh));
 
-	ioh->op = op;
 	ioh->result = 0;
 
-	pgaio_io_update_state(ioh, AHS_DEFINED);
-
-	/* allow a new IO to be staged */
-	my_aio->handed_out_io = NULL;
-
-	pgaio_io_prepare_subject(ioh);
-
-	pgaio_io_update_state(ioh, AHS_PREPARED);
-
-	needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
-
-	elog(DEBUG3, "io:%d: prepared %s, executed synchronously: %d",
-		 pgaio_io_get_id(ioh), pgaio_io_get_op_name(ioh),
-		 needs_synchronous);
-
-	if (!needs_synchronous)
-	{
-		my_aio->staged_ios[my_aio->num_staged_ios++] = ioh;
-		Assert(my_aio->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
-	}
-	else
-	{
-		pgaio_io_prepare_submit(ioh);
-		pgaio_io_perform_synchronously(ioh);
-	}
+	pgaio_io_update_state(ioh, AHS_PREPARING);
 }
 
 /*
@@ -858,8 +869,8 @@ pgaio_io_wait_for_free(void)
 		{
 			/* should not be in in-flight list */
 			case AHS_IDLE:
-			case AHS_DEFINED:
 			case AHS_HANDED_OUT:
+			case AHS_PREPARING:
 			case AHS_PREPARED:
 			case AHS_COMPLETED_LOCAL:
 				elog(ERROR, "shouldn't get here with io:%d in state %d",
@@ -1004,7 +1015,7 @@ pgaio_bounce_buffer_wait_for_free(void)
 			case AHS_IDLE:
 			case AHS_HANDED_OUT:
 				continue;
-			case AHS_DEFINED:	/* should have been submitted above */
+			case AHS_PREPARING:	/* should have been submitted above */
 			case AHS_PREPARED:
 				elog(ERROR, "shouldn't get here with io:%d in state %d",
 					 pgaio_io_get_id(ioh), ioh->state);
diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c
index 3c255775833..e84b79d3f2e 100644
--- a/src/backend/storage/aio/aio_io.c
+++ b/src/backend/storage/aio/aio_io.c
@@ -46,11 +46,12 @@ pgaio_io_prep_readv(PgAioHandle *ioh,
 {
 	pgaio_io_before_prep(ioh);
 
+	ioh->op = PGAIO_OP_READV;
 	ioh->op_data.read.fd = fd;
 	ioh->op_data.read.offset = offset;
 	ioh->op_data.read.iov_length = iovcnt;
 
-	pgaio_io_prepare(ioh, PGAIO_OP_READV);
+	pgaio_io_start_staging(ioh);
 }
 
 void
@@ -59,11 +60,12 @@ pgaio_io_prep_writev(PgAioHandle *ioh,
 {
 	pgaio_io_before_prep(ioh);
 
+	ioh->op = PGAIO_OP_WRITEV;
 	ioh->op_data.write.fd = fd;
 	ioh->op_data.write.offset = offset;
 	ioh->op_data.write.iov_length = iovcnt;
 
-	pgaio_io_prepare(ioh, PGAIO_OP_WRITEV);
+	pgaio_io_start_staging(ioh);
 }
 
 
diff --git a/src/backend/storage/aio/aio_subject.c b/src/backend/storage/aio/aio_subject.c
index b2bd0c235e7..321e1d8e975 100644
--- a/src/backend/storage/aio/aio_subject.c
+++ b/src/backend/storage/aio/aio_subject.c
@@ -119,33 +119,6 @@ pgaio_io_get_subject_name(PgAioHandle *ioh)
 	return aio_subject_info[ioh->subject]->name;
 }
 
-/*
- * Internal function which invokes ->prepare for all the registered callbacks.
- */
-void
-pgaio_io_prepare_subject(PgAioHandle *ioh)
-{
-	Assert(ioh->subject > ASI_INVALID && ioh->subject < ASI_COUNT);
-	Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
-
-	for (int i = ioh->num_shared_callbacks; i > 0; i--)
-	{
-		PgAioHandleSharedCallbackID cbid = ioh->shared_callbacks[i - 1];
-		const PgAioHandleSharedCallbacksEntry *ce = &aio_shared_cbs[cbid];
-
-		if (!ce->cb->prepare)
-			continue;
-
-		elog(DEBUG3, "io:%d, op %s, subject %s, calling cb #%d %d/%s->prepare",
-			 pgaio_io_get_id(ioh),
-			 pgaio_io_get_op_name(ioh),
-			 pgaio_io_get_subject_name(ioh),
-			 i,
-			 cbid, ce->name);
-		ce->cb->prepare(ioh);
-	}
-}
-
 /*
  * Internal function which invokes ->complete for all the registered
  * callbacks.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 9bc0176a2ca..dd30856aca0 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -179,6 +179,9 @@ int			backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
 /* local state for LockBufferForCleanup */
 static BufferDesc *PinCountWaitBuf = NULL;
 
+static void local_buffer_readv_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers);
+static void shared_buffer_writev_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers);
+
 /*
  * Backend-Private refcount management:
  *
@@ -1725,7 +1728,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation,
 
 		pgaio_io_set_io_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
 
-
 		if (persistence == RELPERSISTENCE_TEMP)
 			pgaio_io_add_shared_cb(ioh, ASC_LOCAL_BUFFER_READ);
 		else
@@ -1736,6 +1738,11 @@ AsyncReadBuffers(ReadBuffersOperation *operation,
 		did_start_io_overall = did_start_io_this = true;
 		smgrstartreadv(ioh, operation->smgr, forknum, io_first_block,
 					   io_pages, io_buffers_len);
+		if (persistence == RELPERSISTENCE_TEMP)
+			local_buffer_readv_prepare(ioh, io_buffers, io_buffers_len);
+		else
+			shared_buffer_readv_prepare(ioh, io_buffers, io_buffers_len);
+		pgaio_io_stage(ioh);
 		ioh = NULL;
 		operation->nios++;
 
@@ -4170,10 +4177,11 @@ WriteBuffers(BuffersToWrite *to_write,
 					to_write->data_ptrs,
 					to_write->nbuffers,
 					false);
+	shared_buffer_writev_prepare(to_write->ioh, to_write->buffers, to_write->nbuffers);
+	pgaio_io_stage(to_write->ioh);
 	pgstat_count_io_op_n(IOOBJECT_RELATION, IOCONTEXT_NORMAL,
 						 IOOP_WRITE, to_write->nbuffers);
 
-
 	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
 	{
 		Buffer		cur_buf = to_write->buffers[nbuf];
@@ -6952,20 +6960,16 @@ ReadBufferCompleteWriteShared(Buffer buffer, bool release_lock, bool failed)
  * and writes.
  */
 static void
-shared_buffer_prepare_common(PgAioHandle *ioh, bool is_write)
+shared_buffer_prepare_common(PgAioHandle *ioh, bool is_write, Buffer *buffers, int nbuffers)
 {
-	uint64	   *io_data;
-	uint8		io_data_len;
 	PgAioHandleRef io_ref;
 	BufferTag	first PG_USED_FOR_ASSERTS_ONLY = {0};
 
-	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
-
 	pgaio_io_get_ref(ioh, &io_ref);
 
-	for (int i = 0; i < io_data_len; i++)
+	for (int i = 0; i < nbuffers; i++)
 	{
-		Buffer		buf = (Buffer) io_data[i];
+		Buffer		buf = buffers[i];
 		BufferDesc *bufHdr;
 		uint32		buf_state;
 
@@ -7022,16 +7026,16 @@ shared_buffer_prepare_common(PgAioHandle *ioh, bool is_write)
 	}
 }
 
-static void
-shared_buffer_readv_prepare(PgAioHandle *ioh)
+void
+shared_buffer_readv_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers)
 {
-	shared_buffer_prepare_common(ioh, false);
+	shared_buffer_prepare_common(ioh, false, buffers, nbuffers);
 }
 
 static void
-shared_buffer_writev_prepare(PgAioHandle *ioh)
+shared_buffer_writev_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers)
 {
-	shared_buffer_prepare_common(ioh, true);
+	shared_buffer_prepare_common(ioh, true, buffers, nbuffers);
 }
 
 static PgAioResult
@@ -7135,19 +7139,15 @@ shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result)
  * and writes.
  */
 static void
-local_buffer_readv_prepare(PgAioHandle *ioh)
+local_buffer_readv_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers)
 {
-	uint64	   *io_data;
-	uint8		io_data_len;
 	PgAioHandleRef io_ref;
 
-	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
-
 	pgaio_io_get_ref(ioh, &io_ref);
 
-	for (int i = 0; i < io_data_len; i++)
+	for (int i = 0; i < nbuffers; i++)
 	{
-		Buffer		buf = (Buffer) io_data[i];
+		Buffer		buf = buffers[i];
 		BufferDesc *bufHdr;
 		uint32		buf_state;
 
@@ -7199,27 +7199,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result)
 	return result;
 }
 
-static void
-local_buffer_writev_prepare(PgAioHandle *ioh)
-{
-	elog(ERROR, "not yet");
-}
-
-
 const struct PgAioHandleSharedCallbacks aio_shared_buffer_readv_cb = {
-	.prepare = shared_buffer_readv_prepare,
 	.complete = shared_buffer_readv_complete,
 	.error = buffer_readv_error,
 };
 const struct PgAioHandleSharedCallbacks aio_shared_buffer_writev_cb = {
-	.prepare = shared_buffer_writev_prepare,
 	.complete = shared_buffer_writev_complete,
 };
 const struct PgAioHandleSharedCallbacks aio_local_buffer_readv_cb = {
-	.prepare = local_buffer_readv_prepare,
 	.complete = local_buffer_readv_complete,
 	.error = buffer_readv_error,
 };
 const struct PgAioHandleSharedCallbacks aio_local_buffer_writev_cb = {
-	.prepare = local_buffer_writev_prepare,
+
 };
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index d12225a9949..bf4522eeac6 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -985,9 +985,9 @@ mdstartreadv(PgAioHandle *ioh,
 							  forknum,
 							  blocknum,
 							  nblocks);
-	pgaio_io_add_shared_cb(ioh, ASC_MD_READV);
-
 	FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
+
+	pgaio_io_add_shared_cb(ioh, ASC_MD_READV);
 }
 
 /*
@@ -1136,9 +1136,8 @@ mdstartwritev(PgAioHandle *ioh,
 							  forknum,
 							  blocknum,
 							  nblocks);
-	pgaio_io_add_shared_cb(ioh, ASC_MD_WRITEV);
-
 	FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+	pgaio_io_add_shared_cb(ioh, ASC_MD_WRITEV);
 }
 
 
diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index caa52d2aaba..d126a10f9d4 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -212,12 +212,10 @@ typedef struct PgAioSubjectInfo
 
 
 typedef PgAioResult (*PgAioHandleSharedCallbackComplete) (PgAioHandle *ioh, PgAioResult prior_result);
-typedef void (*PgAioHandleSharedCallbackPrepare) (PgAioHandle *ioh);
 typedef void (*PgAioHandleSharedCallbackError) (PgAioResult result, const PgAioSubjectData *subject_data, int elevel);
 
 typedef struct PgAioHandleSharedCallbacks
 {
-	PgAioHandleSharedCallbackPrepare prepare;
 	PgAioHandleSharedCallbackComplete complete;
 	PgAioHandleSharedCallbackError error;
 } PgAioHandleSharedCallbacks;
@@ -247,6 +245,8 @@ struct ResourceOwnerData;
 extern PgAioHandle *pgaio_io_get(struct ResourceOwnerData *resowner, PgAioReturn *ret);
 extern PgAioHandle *pgaio_io_get_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret);
 
+extern void pgaio_io_stage(PgAioHandle *ioh);
+
 extern void pgaio_io_release(PgAioHandle *ioh);
 extern void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error);
 
@@ -261,7 +261,7 @@ extern void pgaio_io_set_io_data_32(PgAioHandle *ioh, uint32 *data, uint8 len);
 extern void pgaio_io_set_io_data_64(PgAioHandle *ioh, uint64 *data, uint8 len);
 extern uint64 *pgaio_io_get_io_data(PgAioHandle *ioh, uint8 *len);
 
-extern void pgaio_io_prepare(PgAioHandle *ioh, PgAioOp op);
+extern void pgaio_io_start_staging(PgAioHandle *ioh);
 
 extern int	pgaio_io_get_id(PgAioHandle *ioh);
 struct iovec;
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
index f4c57438dd4..55677d7dc8c 100644
--- a/src/include/storage/aio_internal.h
+++ b/src/include/storage/aio_internal.h
@@ -37,10 +37,10 @@ typedef enum PgAioHandleState
 	/* returned by pgaio_io_get() */
 	AHS_HANDED_OUT,
 
-	/* pgaio_io_start_*() has been called, but IO hasn't been submitted yet */
-	AHS_DEFINED,
+	/* pgaio_io_start_staging() has been called, but IO hasn't been fully staged yet */
+	AHS_PREPARING,
 
-	/* subjects prepare() callback has been called */
+	/* pgaio_io_stage() has been called, but the IO hasn't been submitted yet */
 	AHS_PREPARED,
 
 	/* IO is being executed */
@@ -249,7 +249,6 @@ typedef struct IoMethodOps
 
 extern bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state);
 
-extern void pgaio_io_prepare_subject(PgAioHandle *ioh);
 extern void pgaio_io_process_completion_subject(PgAioHandle *ioh);
 extern void pgaio_io_process_completion(PgAioHandle *ioh, int result);
 extern void pgaio_io_prepare_submit(PgAioHandle *ioh);
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 3523d8a3860..5c7d602d91b 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -425,6 +425,7 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 
 /* solely to make it easier to write tests */
 extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
+extern void shared_buffer_readv_prepare(PgAioHandle *ioh, Buffer *buffers, int nbuffers);
 
 
 /* freelist.c */
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index e495c5309b3..446da4f0231 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -264,6 +264,8 @@ read_corrupt_rel_block(PG_FUNCTION_ARGS)
 	smgrstartreadv(ioh, smgr, MAIN_FORKNUM, block,
 				   (void *) &page, 1);
 
+	shared_buffer_readv_prepare(ioh, &buf, 1);
+	pgaio_io_stage(ioh);
 	ReleaseBuffer(buf);
 
 	pgaio_io_ref_wait(&ior);

Reply via email to