Hi Amit,

Thank you for your interest in the discussion!

On Monday, February 26, 2024 16:24 MSK, Amit Kapila <amit.kapil...@gmail.com> 
wrote:
 
I think the reason is probably that when the WAL record for prepared is already 
flushed then what will be the idea of async commit here?I think, the idea of 
async commit should be applied for both transactions: PREPARE and COMMIT 
PREPARED, which are actually two separate local transactions. For both these 
transactions we may call XLogSetAsyncXactLSN on commit instead of XLogFlush 
when async commit is enabled. When I use async commit, I mean to apply async 
commit to local transactions, not to a twophase (prepared) transaction itself.
 
At commit prepared, it seems we read prepare's WAL record, right? If so, it is 
not clear to me do you see a problem with a flush of commit_prepared or reading 
WAL for prepared or both of these.The problem with reading WAL is due to async 
commit of PREPARE TRANSACTION which saves 2PC in the WAL. At the moment of 
COMMIT PREPARED the WAL with PREPARE TRANSACTION 2PC state may not be 
XLogFlush-ed yet. So, PREPARE TRANSACTION should wait until its 2PC state is 
flushed.

I did some experiments with saving 2PC state in the local memory of logical 
replication worker and, I think, it worked and demonstrated much better 
performance. Logical replication worker utilized up to 100% CPU. I'm just 
concerned about possible problems with async commit for twophase transactions.

To be more specific, I've attached a patch to support async commit for 
twophase. It is not the final patch but it is presented only for discussion 
purposes. There were some attempts to save 2PC in memory in past but it was 
rejected. Now, there might be the second round to discuss it.

With best regards,
Vitaly

 
From 549f809fa122ca0842ec4bfc775afd08feee0d80 Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davy...@postgrespro.ru>
Date: Tue, 27 Feb 2024 14:02:23 +0300
Subject: [PATCH] Add asynchronous commit support for 2PC

---
 src/backend/access/transam/twophase.c | 111 +++++++++++++++++++++++++-
 1 file changed, 108 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..52f0853db8 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -109,6 +109,8 @@
 #include "utils/memutils.h"
 #include "utils/timestamp.h"
 
+#define POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT
+
 /*
  * Directory where Two-phase commit files reside within PGDATA
  */
@@ -163,6 +165,9 @@ typedef struct GlobalTransactionData
 	 */
 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
+	void*       prepare_2pc_mem_data;
+	size_t      prepare_2pc_mem_len;
+	pid_t       prepare_2pc_proc;
 	TransactionId xid;			/* The GXACT id */
 
 	Oid			owner;			/* ID of user that executed the xact */
@@ -427,6 +432,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 
 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
 
+	Assert(gxact->prepare_2pc_mem_data == NULL);
+	Assert(gxact->prepare_2pc_proc == 0);
+
 	gxact->ondisk = false;
 
 	/* And insert it into the active array */
@@ -1129,6 +1137,8 @@ StartPrepare(GlobalTransaction gxact)
 	}
 }
 
+extern bool IsLogicalWorker(void);
+
 /*
  * Finish preparing state data and writing it to WAL.
  */
@@ -1167,6 +1177,37 @@ EndPrepare(GlobalTransaction gxact)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("two-phase state file maximum length exceeded")));
 
+	Assert(gxact->prepare_2pc_mem_data == NULL);
+	Assert(gxact->prepare_2pc_proc == 0);
+
+	if (IsLogicalWorker())
+	{
+		size_t			len = 0;
+		size_t			offset = 0;
+
+		for (record = records.head; record != NULL; record = record->next)
+			len += record->len;
+
+		if (len > 0)
+		{
+			MemoryContext	oldmemctx;
+
+			oldmemctx = MemoryContextSwitchTo(TopMemoryContext);
+
+			gxact->prepare_2pc_mem_data = palloc(len);
+			gxact->prepare_2pc_mem_len = len;
+			gxact->prepare_2pc_proc = getpid();
+
+			for (record = records.head; record != NULL; record = record->next)
+			{
+				memcpy((char *)gxact->prepare_2pc_mem_data + offset, record->data, record->len);
+				offset += record->len;
+			}
+
+			MemoryContextSwitchTo(oldmemctx);
+		}
+	}
+
 	/*
 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
 	 * cover us, so no need to calculate a separate CRC.
@@ -1202,8 +1243,24 @@ EndPrepare(GlobalTransaction gxact)
 								   gxact->prepare_end_lsn);
 	}
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	XLogFlush(gxact->prepare_end_lsn);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(gxact->prepare_end_lsn);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(gxact->prepare_end_lsn);
+	}
+
+#endif
+
 	/* If we crash now, we have prepared: WAL replay will fix things */
 
 	/* Store record's start location to read that later on Commit */
@@ -1495,6 +1552,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	xl_xact_stats_item *commitstats;
 	xl_xact_stats_item *abortstats;
 	SharedInvalidationMessage *invalmsgs;
+	bool		is_local_2pc_buf = false;
 
 	/*
 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1509,12 +1567,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * in WAL files if the LSN is after the last checkpoint record, or moved
 	 * to disk if for some reason they have lived for a long time.
 	 */
-	if (gxact->ondisk)
+	if (gxact->prepare_2pc_mem_data != NULL && gxact->prepare_2pc_proc == getpid())
+	{
+		Assert(IsLogicalWorker());
+		Assert(gxact->prepare_2pc_proc == getpid());
+		buf = gxact->prepare_2pc_mem_data;
+		/* ereport(LOG, errmsg("%s:%d", __FUNCTION__, __LINE__)); */
+		is_local_2pc_buf = true;
+	}
+	else if (gxact->ondisk)
 		buf = ReadTwoPhaseFile(xid, false);
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
-
 	/*
 	 * Disassemble the header area
 	 */
@@ -1638,6 +1703,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/* Clear shared memory state */
 	RemoveGXact(gxact);
 
+	if (gxact->prepare_2pc_mem_data != NULL)
+	{
+		MemoryContext		memctx;
+		//Assert(gxact->prepare_2pc_proc == getpid());
+		memctx = MemoryContextSwitchTo(TopMemoryContext);
+		if (gxact->prepare_2pc_proc == getpid())
+			pfree(gxact->prepare_2pc_mem_data);
+		gxact->prepare_2pc_mem_data = NULL;
+		gxact->prepare_2pc_mem_len = 0;
+		gxact->prepare_2pc_proc = 0;
+		MemoryContextSwitchTo(memctx);
+	}
+
 	/*
 	 * Release the lock as all callbacks are called and shared memory cleanup
 	 * is done.
@@ -1657,7 +1735,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 
 	RESUME_INTERRUPTS();
 
-	pfree(buf);
+	if (!is_local_2pc_buf)
+		pfree(buf);
 }
 
 /*
@@ -2349,12 +2428,38 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * a contradiction)
 	 */
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	/* Flush XLOG to disk */
 	XLogFlush(recptr);
 
 	/* Mark the transaction committed in pg_xact */
 	TransactionIdCommitTree(xid, nchildren, children);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(recptr);
+
+		/* Mark the transaction committed in pg_xact */
+		TransactionIdCommitTree(xid, nchildren, children);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(recptr);
+
+		/*
+		 * We must not immediately update the CLOG, since we didn't flush the
+		 * XLOG. Instead, we store the LSN up to which the XLOG must be
+		 * flushed before the CLOG may be updated.
+		 */
+		TransactionIdAsyncCommitTree(xid, nchildren, children, recptr);
+	}
+
+#endif
+
 	/* Checkpoint can proceed now */
 	MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
 
-- 
2.34.1

Reply via email to