Dear All,

Consider, please, my patch for async commit for twophase transactions. It can 
be applicable when catchup performance is not enought with publication 
parameter twophase = on.

The key changes are:
 * Use XLogSetAsyncXactLSN instead of XLogFlush as it is for usual 
transactions. * In case of async commit only, save 2PC state in the pg_twophase 
file (but not fsync it) in addition to saving in the WAL. The file is used as 
an alternative to storing 2pc state in the memory. * On recovery, reject 
pg_twophase files with future xids.Probably, 2PC async commit should be enabled 
by a GUC (not implemented in the patch).

With best regards,
Vitaly


 
From cbaaa7270d771f9ccd6def08f0f02ce61dc15ff6 Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davy...@postgrespro.ru>
Date: Thu, 29 Feb 2024 18:58:13 +0300
Subject: [PATCH] Async commit support for twophase transactions

---
 src/backend/access/transam/twophase.c | 171 +++++++++++++++++++++-----
 1 file changed, 138 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 234c8d08eb..352266be14 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -109,6 +109,8 @@
 #include "utils/memutils.h"
 #include "utils/timestamp.h"
 
+#define POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT
+
 /*
  * Directory where Two-phase commit files reside within PGDATA
  */
@@ -169,6 +171,7 @@ typedef struct GlobalTransactionData
 	BackendId	locking_backend;	/* backend currently working on the xact */
 	bool		valid;			/* true if PGPROC entry is in proc array */
 	bool		ondisk;			/* true if prepare state file is on disk */
+	bool		infile;			/* true if prepared state saved in file (but not fsync-ed) */
 	bool		inredo;			/* true if entry was added via xlog_redo */
 	char		gid[GIDSIZE];	/* The GID assigned to the prepared xact */
 }			GlobalTransactionData;
@@ -227,12 +230,14 @@ static void RemoveGXact(GlobalTransaction gxact);
 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 static char *ProcessTwoPhaseBuffer(TransactionId xid,
 								   XLogRecPtr prepare_start_lsn,
-								   bool fromdisk, bool setParent, bool setNextXid);
+								   bool fromdisk, bool setParent, bool setNextXid,
+								   const char *filename);
 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
 								const char *gid, TimestampTz prepared_at, Oid owner,
 								Oid databaseid);
 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
-static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+static void RemoveTwoPhaseFileByName(const char *filename, bool giveWarning);
+static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len, bool dosync);
 
 /*
  * Initialization of shared memory
@@ -427,6 +432,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 
 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
 
+	gxact->infile = false;
 	gxact->ondisk = false;
 
 	/* And insert it into the active array */
@@ -1204,6 +1210,37 @@ EndPrepare(GlobalTransaction gxact)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("two-phase state file maximum length exceeded")));
 
+#ifdef POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT
+
+	Assert(gxact->infile == false);
+
+	if (synchronous_commit == SYNCHRONOUS_COMMIT_OFF)
+	{
+		char		   *buf;
+		size_t			len = 0;
+		size_t			offset = 0;
+
+		for (record = records.head; record != NULL; record = record->next)
+			len += record->len;
+
+		if (len > 0)
+		{
+			buf = palloc(len);
+
+			for (record = records.head; record != NULL; record = record->next)
+			{
+				memcpy(buf + offset, record->data, record->len);
+				offset += record->len;
+			}
+
+			RecreateTwoPhaseFile(gxact->xid, buf, len, false);
+			pfree(buf);
+			gxact->infile = true;
+		}
+	}
+
+#endif
+
 	/*
 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
 	 * cover us, so no need to calculate a separate CRC.
@@ -1239,8 +1276,24 @@ EndPrepare(GlobalTransaction gxact)
 								   gxact->prepare_end_lsn);
 	}
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	XLogFlush(gxact->prepare_end_lsn);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(gxact->prepare_end_lsn);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(gxact->prepare_end_lsn);
+	}
+
+#endif
+
 	/* If we crash now, we have prepared: WAL replay will fix things */
 
 	/* Store record's start location to read that later on Commit */
@@ -1546,12 +1599,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * in WAL files if the LSN is after the last checkpoint record, or moved
 	 * to disk if for some reason they have lived for a long time.
 	 */
-	if (gxact->ondisk)
+	if (gxact->infile || gxact->ondisk)
 		buf = ReadTwoPhaseFile(xid, false);
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
-
 	/*
 	 * Disassemble the header area
 	 */
@@ -1687,7 +1739,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/*
 	 * And now we can clean up any files we may have left.
 	 */
-	if (gxact->ondisk)
+	if (gxact->infile || gxact->ondisk)
 		RemoveTwoPhaseFile(xid, true);
 
 	MyLockedGxact = NULL;
@@ -1741,6 +1793,20 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
 					 errmsg("could not remove file \"%s\": %m", path)));
 }
 
+static void
+RemoveTwoPhaseFileByName(const char *filename, bool giveWarning)
+{
+	char		path[MAXPGPATH];
+
+	snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%s", filename);
+
+	if (unlink(path))
+		if (errno != ENOENT || giveWarning)
+			ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+}
+
 /*
  * Recreates a state file. This is used in WAL replay and during
  * checkpoint creation.
@@ -1748,7 +1814,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
  * Note: content and len don't include CRC.
  */
 static void
-RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+RecreateTwoPhaseFile(TransactionId xid, void *content, int len, bool dosync)
 {
 	char		path[MAXPGPATH];
 	pg_crc32c	statefile_crc;
@@ -1791,16 +1857,19 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 	}
 	pgstat_report_wait_end();
 
-	/*
-	 * We must fsync the file because the end-of-replay checkpoint will not do
-	 * so, there being no GXACT in shared memory yet to tell it to.
-	 */
-	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
-	if (pg_fsync(fd) != 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not fsync file \"%s\": %m", path)));
-	pgstat_report_wait_end();
+	if (dosync)
+	{
+		/*
+		* We must fsync the file because the end-of-replay checkpoint will not do
+		* so, there being no GXACT in shared memory yet to tell it to.
+		*/
+		pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
+		if (pg_fsync(fd) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					errmsg("could not fsync file \"%s\": %m", path)));
+		pgstat_report_wait_end();
+	}
 
 	if (CloseTransientFile(fd) != 0)
 		ereport(ERROR,
@@ -1871,7 +1940,8 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 			int			len;
 
 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
-			RecreateTwoPhaseFile(gxact->xid, buf, len);
+			RecreateTwoPhaseFile(gxact->xid, buf, len, true);
+			gxact->infile = true;
 			gxact->ondisk = true;
 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
@@ -1930,7 +2000,8 @@ restoreTwoPhaseData(void)
 			xid = XidFromFullTransactionId(fxid);
 
 			buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
-										true, false, false);
+										true, false, false,
+										clde->d_name);
 			if (buf == NULL)
 				continue;
 
@@ -1997,7 +2068,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		buf = ProcessTwoPhaseBuffer(xid,
 									gxact->prepare_start_lsn,
-									gxact->ondisk, false, true);
+									gxact->ondisk, false, true,
+									NULL);
 
 		if (buf == NULL)
 			continue;
@@ -2072,7 +2144,8 @@ StandbyRecoverPreparedTransactions(void)
 
 		buf = ProcessTwoPhaseBuffer(xid,
 									gxact->prepare_start_lsn,
-									gxact->ondisk, false, false);
+									gxact->ondisk, false, false,
+									NULL);
 		if (buf != NULL)
 			pfree(buf);
 	}
@@ -2124,7 +2197,8 @@ RecoverPreparedTransactions(void)
 		 */
 		buf = ProcessTwoPhaseBuffer(xid,
 									gxact->prepare_start_lsn,
-									gxact->ondisk, true, false);
+									gxact->ondisk, true, false,
+									NULL);
 		if (buf == NULL)
 			continue;
 
@@ -2202,7 +2276,8 @@ static char *
 ProcessTwoPhaseBuffer(TransactionId xid,
 					  XLogRecPtr prepare_start_lsn,
 					  bool fromdisk,
-					  bool setParent, bool setNextXid)
+					  bool setParent, bool setNextXid,
+					  const char *filename)
 {
 	FullTransactionId nextXid = TransamVariables->nextXid;
 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
@@ -2216,40 +2291,43 @@ ProcessTwoPhaseBuffer(TransactionId xid,
 	if (!fromdisk)
 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
-	/* Already processed? */
-	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+	/* Reject XID if too new */
+	if (TransactionIdFollowsOrEquals(xid, origNextXid))
 	{
 		if (fromdisk)
 		{
 			ereport(WARNING,
-					(errmsg("removing stale two-phase state file for transaction %u",
+					(errmsg("removing future two-phase state file for transaction %u",
 							xid)));
-			RemoveTwoPhaseFile(xid, true);
+			if (filename)
+				RemoveTwoPhaseFileByName(filename, true);
+			else
+				RemoveTwoPhaseFile(xid, true);
 		}
 		else
 		{
 			ereport(WARNING,
-					(errmsg("removing stale two-phase state from memory for transaction %u",
+					(errmsg("removing future two-phase state from memory for transaction %u",
 							xid)));
 			PrepareRedoRemove(xid, true);
 		}
 		return NULL;
 	}
 
-	/* Reject XID if too new */
-	if (TransactionIdFollowsOrEquals(xid, origNextXid))
+	/* Already processed? */
+	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
 	{
 		if (fromdisk)
 		{
 			ereport(WARNING,
-					(errmsg("removing future two-phase state file for transaction %u",
+					(errmsg("removing stale two-phase state file for transaction %u",
 							xid)));
 			RemoveTwoPhaseFile(xid, true);
 		}
 		else
 		{
 			ereport(WARNING,
-					(errmsg("removing future two-phase state from memory for transaction %u",
+					(errmsg("removing stale two-phase state from memory for transaction %u",
 							xid)));
 			PrepareRedoRemove(xid, true);
 		}
@@ -2388,12 +2466,38 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * a contradiction)
 	 */
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	/* Flush XLOG to disk */
 	XLogFlush(recptr);
 
 	/* Mark the transaction committed in pg_xact */
 	TransactionIdCommitTree(xid, nchildren, children);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(recptr);
+
+		/* Mark the transaction committed in pg_xact */
+		TransactionIdCommitTree(xid, nchildren, children);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(recptr);
+
+		/*
+		 * We must not immediately update the CLOG, since we didn't flush the
+		 * XLOG. Instead, we store the LSN up to which the XLOG must be
+		 * flushed before the CLOG may be updated.
+		 */
+		TransactionIdAsyncCommitTree(xid, nchildren, children, recptr);
+	}
+
+#endif
+
 	/* Checkpoint can proceed now */
 	MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
 
@@ -2567,6 +2671,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 	gxact->locking_backend = InvalidBackendId;
 	gxact->valid = false;
 	gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
+	gxact->infile = gxact->ondisk;
 	gxact->inredo = true;		/* yes, added in redo */
 	strcpy(gxact->gid, gid);
 
@@ -2625,7 +2730,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	 * And now we can clean up any files we may have left.
 	 */
 	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
-	if (gxact->ondisk)
+	if (gxact->infile || gxact->ondisk)
 		RemoveTwoPhaseFile(xid, giveWarning);
 	RemoveGXact(gxact);
 }
@@ -2673,7 +2778,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 			 * do this optimization if we encounter many collisions in GID
 			 * between publisher and subscriber.
 			 */
-			if (gxact->ondisk)
+			if (gxact->infile || gxact->ondisk)
 				buf = ReadTwoPhaseFile(gxact->xid, false);
 			else
 			{
-- 
2.34.1

Reply via email to