Hi Amit, Thank you for your interest in the discussion!
On Monday, February 26, 2024 16:24 MSK, Amit Kapila <amit.kapil...@gmail.com> wrote: I think the reason is probably that when the WAL record for prepared is already flushed then what will be the idea of async commit here?I think, the idea of async commit should be applied for both transactions: PREPARE and COMMIT PREPARED, which are actually two separate local transactions. For both these transactions we may call XLogSetAsyncXactLSN on commit instead of XLogFlush when async commit is enabled. When I use async commit, I mean to apply async commit to local transactions, not to a twophase (prepared) transaction itself. At commit prepared, it seems we read prepare's WAL record, right? If so, it is not clear to me do you see a problem with a flush of commit_prepared or reading WAL for prepared or both of these.The problem with reading WAL is due to async commit of PREPARE TRANSACTION which saves 2PC in the WAL. At the moment of COMMIT PREPARED the WAL with PREPARE TRANSACTION 2PC state may not be XLogFlush-ed yet. So, PREPARE TRANSACTION should wait until its 2PC state is flushed. I did some experiments with saving 2PC state in the local memory of logical replication worker and, I think, it worked and demonstrated much better performance. Logical replication worker utilized up to 100% CPU. I'm just concerned about possible problems with async commit for twophase transactions. To be more specific, I've attached a patch to support async commit for twophase. It is not the final patch but it is presented only for discussion purposes. There were some attempts to save 2PC in memory in past but it was rejected. Now, there might be the second round to discuss it. With best regards, Vitaly
From 549f809fa122ca0842ec4bfc775afd08feee0d80 Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Tue, 27 Feb 2024 14:02:23 +0300 Subject: [PATCH] Add asynchronous commit support for 2PC --- src/backend/access/transam/twophase.c | 111 +++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 3 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e..52f0853db8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -109,6 +109,8 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#define POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT + /* * Directory where Two-phase commit files reside within PGDATA */ @@ -163,6 +165,9 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ + void* prepare_2pc_mem_data; + size_t prepare_2pc_mem_len; + pid_t prepare_2pc_proc; TransactionId xid; /* The GXACT id */ Oid owner; /* ID of user that executed the xact */ @@ -427,6 +432,9 @@ MarkAsPreparing(TransactionId xid, const char *gid, MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid); + Assert(gxact->prepare_2pc_mem_data == NULL); + Assert(gxact->prepare_2pc_proc == 0); + gxact->ondisk = false; /* And insert it into the active array */ @@ -1129,6 +1137,8 @@ StartPrepare(GlobalTransaction gxact) } } +extern bool IsLogicalWorker(void); + /* * Finish preparing state data and writing it to WAL. */ @@ -1167,6 +1177,37 @@ EndPrepare(GlobalTransaction gxact) (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("two-phase state file maximum length exceeded"))); + Assert(gxact->prepare_2pc_mem_data == NULL); + Assert(gxact->prepare_2pc_proc == 0); + + if (IsLogicalWorker()) + { + size_t len = 0; + size_t offset = 0; + + for (record = records.head; record != NULL; record = record->next) + len += record->len; + + if (len > 0) + { + MemoryContext oldmemctx; + + oldmemctx = MemoryContextSwitchTo(TopMemoryContext); + + gxact->prepare_2pc_mem_data = palloc(len); + gxact->prepare_2pc_mem_len = len; + gxact->prepare_2pc_proc = getpid(); + + for (record = records.head; record != NULL; record = record->next) + { + memcpy((char *)gxact->prepare_2pc_mem_data + offset, record->data, record->len); + offset += record->len; + } + + MemoryContextSwitchTo(oldmemctx); + } + } + /* * Now writing 2PC state data to WAL. We let the WAL's CRC protection * cover us, so no need to calculate a separate CRC. @@ -1202,8 +1243,24 @@ EndPrepare(GlobalTransaction gxact) gxact->prepare_end_lsn); } +#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT) + XLogFlush(gxact->prepare_end_lsn); +#else + + if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF) + { + /* Flush XLOG to disk */ + XLogFlush(gxact->prepare_end_lsn); + } + else + { + XLogSetAsyncXactLSN(gxact->prepare_end_lsn); + } + +#endif + /* If we crash now, we have prepared: WAL replay will fix things */ /* Store record's start location to read that later on Commit */ @@ -1495,6 +1552,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) xl_xact_stats_item *commitstats; xl_xact_stats_item *abortstats; SharedInvalidationMessage *invalmsgs; + bool is_local_2pc_buf = false; /* * Validate the GID, and lock the GXACT to ensure that two backends do not @@ -1509,12 +1567,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * in WAL files if the LSN is after the last checkpoint record, or moved * to disk if for some reason they have lived for a long time. */ - if (gxact->ondisk) + if (gxact->prepare_2pc_mem_data != NULL && gxact->prepare_2pc_proc == getpid()) + { + Assert(IsLogicalWorker()); + Assert(gxact->prepare_2pc_proc == getpid()); + buf = gxact->prepare_2pc_mem_data; + /* ereport(LOG, errmsg("%s:%d", __FUNCTION__, __LINE__)); */ + is_local_2pc_buf = true; + } + else if (gxact->ondisk) buf = ReadTwoPhaseFile(xid, false); else XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); - /* * Disassemble the header area */ @@ -1638,6 +1703,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* Clear shared memory state */ RemoveGXact(gxact); + if (gxact->prepare_2pc_mem_data != NULL) + { + MemoryContext memctx; + //Assert(gxact->prepare_2pc_proc == getpid()); + memctx = MemoryContextSwitchTo(TopMemoryContext); + if (gxact->prepare_2pc_proc == getpid()) + pfree(gxact->prepare_2pc_mem_data); + gxact->prepare_2pc_mem_data = NULL; + gxact->prepare_2pc_mem_len = 0; + gxact->prepare_2pc_proc = 0; + MemoryContextSwitchTo(memctx); + } + /* * Release the lock as all callbacks are called and shared memory cleanup * is done. @@ -1657,7 +1735,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RESUME_INTERRUPTS(); - pfree(buf); + if (!is_local_2pc_buf) + pfree(buf); } /* @@ -2349,12 +2428,38 @@ RecordTransactionCommitPrepared(TransactionId xid, * a contradiction) */ +#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT) + /* Flush XLOG to disk */ XLogFlush(recptr); /* Mark the transaction committed in pg_xact */ TransactionIdCommitTree(xid, nchildren, children); +#else + + if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF) + { + /* Flush XLOG to disk */ + XLogFlush(recptr); + + /* Mark the transaction committed in pg_xact */ + TransactionIdCommitTree(xid, nchildren, children); + } + else + { + XLogSetAsyncXactLSN(recptr); + + /* + * We must not immediately update the CLOG, since we didn't flush the + * XLOG. Instead, we store the LSN up to which the XLOG must be + * flushed before the CLOG may be updated. + */ + TransactionIdAsyncCommitTree(xid, nchildren, children, recptr); + } + +#endif + /* Checkpoint can proceed now */ MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; -- 2.34.1