Dear All, Consider, please, my patch for async commit for twophase transactions. It can be applicable when catchup performance is not enought with publication parameter twophase = on.
The key changes are: * Use XLogSetAsyncXactLSN instead of XLogFlush as it is for usual transactions. * In case of async commit only, save 2PC state in the pg_twophase file (but not fsync it) in addition to saving in the WAL. The file is used as an alternative to storing 2pc state in the memory. * On recovery, reject pg_twophase files with future xids.Probably, 2PC async commit should be enabled by a GUC (not implemented in the patch). With best regards, Vitaly
From cbaaa7270d771f9ccd6def08f0f02ce61dc15ff6 Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Thu, 29 Feb 2024 18:58:13 +0300 Subject: [PATCH] Async commit support for twophase transactions --- src/backend/access/transam/twophase.c | 171 +++++++++++++++++++++----- 1 file changed, 138 insertions(+), 33 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 234c8d08eb..352266be14 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -109,6 +109,8 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#define POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT + /* * Directory where Two-phase commit files reside within PGDATA */ @@ -169,6 +171,7 @@ typedef struct GlobalTransactionData BackendId locking_backend; /* backend currently working on the xact */ bool valid; /* true if PGPROC entry is in proc array */ bool ondisk; /* true if prepare state file is on disk */ + bool infile; /* true if prepared state saved in file (but not fsync-ed) */ bool inredo; /* true if entry was added via xlog_redo */ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ } GlobalTransactionData; @@ -227,12 +230,14 @@ static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); static char *ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, - bool fromdisk, bool setParent, bool setNextXid); + bool fromdisk, bool setParent, bool setNextXid, + const char *filename); static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); -static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); +static void RemoveTwoPhaseFileByName(const char *filename, bool giveWarning); +static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len, bool dosync); /* * Initialization of shared memory @@ -427,6 +432,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid); + gxact->infile = false; gxact->ondisk = false; /* And insert it into the active array */ @@ -1204,6 +1210,37 @@ EndPrepare(GlobalTransaction gxact) (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("two-phase state file maximum length exceeded"))); +#ifdef POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT + + Assert(gxact->infile == false); + + if (synchronous_commit == SYNCHRONOUS_COMMIT_OFF) + { + char *buf; + size_t len = 0; + size_t offset = 0; + + for (record = records.head; record != NULL; record = record->next) + len += record->len; + + if (len > 0) + { + buf = palloc(len); + + for (record = records.head; record != NULL; record = record->next) + { + memcpy(buf + offset, record->data, record->len); + offset += record->len; + } + + RecreateTwoPhaseFile(gxact->xid, buf, len, false); + pfree(buf); + gxact->infile = true; + } + } + +#endif + /* * Now writing 2PC state data to WAL. We let the WAL's CRC protection * cover us, so no need to calculate a separate CRC. @@ -1239,8 +1276,24 @@ EndPrepare(GlobalTransaction gxact) gxact->prepare_end_lsn); } +#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT) + XLogFlush(gxact->prepare_end_lsn); +#else + + if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF) + { + /* Flush XLOG to disk */ + XLogFlush(gxact->prepare_end_lsn); + } + else + { + XLogSetAsyncXactLSN(gxact->prepare_end_lsn); + } + +#endif + /* If we crash now, we have prepared: WAL replay will fix things */ /* Store record's start location to read that later on Commit */ @@ -1546,12 +1599,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * in WAL files if the LSN is after the last checkpoint record, or moved * to disk if for some reason they have lived for a long time. */ - if (gxact->ondisk) + if (gxact->infile || gxact->ondisk) buf = ReadTwoPhaseFile(xid, false); else XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); - /* * Disassemble the header area */ @@ -1687,7 +1739,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* * And now we can clean up any files we may have left. */ - if (gxact->ondisk) + if (gxact->infile || gxact->ondisk) RemoveTwoPhaseFile(xid, true); MyLockedGxact = NULL; @@ -1741,6 +1793,20 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) errmsg("could not remove file \"%s\": %m", path))); } +static void +RemoveTwoPhaseFileByName(const char *filename, bool giveWarning) +{ + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%s", filename); + + if (unlink(path)) + if (errno != ENOENT || giveWarning) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); +} + /* * Recreates a state file. This is used in WAL replay and during * checkpoint creation. @@ -1748,7 +1814,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) * Note: content and len don't include CRC. */ static void -RecreateTwoPhaseFile(TransactionId xid, void *content, int len) +RecreateTwoPhaseFile(TransactionId xid, void *content, int len, bool dosync) { char path[MAXPGPATH]; pg_crc32c statefile_crc; @@ -1791,16 +1857,19 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) } pgstat_report_wait_end(); - /* - * We must fsync the file because the end-of-replay checkpoint will not do - * so, there being no GXACT in shared memory yet to tell it to. - */ - pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC); - if (pg_fsync(fd) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", path))); - pgstat_report_wait_end(); + if (dosync) + { + /* + * We must fsync the file because the end-of-replay checkpoint will not do + * so, there being no GXACT in shared memory yet to tell it to. + */ + pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC); + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", path))); + pgstat_report_wait_end(); + } if (CloseTransientFile(fd) != 0) ereport(ERROR, @@ -1871,7 +1940,8 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(gxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->xid, buf, len, true); + gxact->infile = true; gxact->ondisk = true; gxact->prepare_start_lsn = InvalidXLogRecPtr; gxact->prepare_end_lsn = InvalidXLogRecPtr; @@ -1930,7 +2000,8 @@ restoreTwoPhaseData(void) xid = XidFromFullTransactionId(fxid); buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr, - true, false, false); + true, false, false, + clde->d_name); if (buf == NULL) continue; @@ -1997,7 +2068,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, - gxact->ondisk, false, true); + gxact->ondisk, false, true, + NULL); if (buf == NULL) continue; @@ -2072,7 +2144,8 @@ StandbyRecoverPreparedTransactions(void) buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, - gxact->ondisk, false, false); + gxact->ondisk, false, false, + NULL); if (buf != NULL) pfree(buf); } @@ -2124,7 +2197,8 @@ RecoverPreparedTransactions(void) */ buf = ProcessTwoPhaseBuffer(xid, gxact->prepare_start_lsn, - gxact->ondisk, true, false); + gxact->ondisk, true, false, + NULL); if (buf == NULL) continue; @@ -2202,7 +2276,8 @@ static char * ProcessTwoPhaseBuffer(TransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, - bool setParent, bool setNextXid) + bool setParent, bool setNextXid, + const char *filename) { FullTransactionId nextXid = TransamVariables->nextXid; TransactionId origNextXid = XidFromFullTransactionId(nextXid); @@ -2216,40 +2291,43 @@ ProcessTwoPhaseBuffer(TransactionId xid, if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) { if (fromdisk) { ereport(WARNING, - (errmsg("removing stale two-phase state file for transaction %u", + (errmsg("removing future two-phase state file for transaction %u", xid))); - RemoveTwoPhaseFile(xid, true); + if (filename) + RemoveTwoPhaseFileByName(filename, true); + else + RemoveTwoPhaseFile(xid, true); } else { ereport(WARNING, - (errmsg("removing stale two-phase state from memory for transaction %u", + (errmsg("removing future two-phase state from memory for transaction %u", xid))); PrepareRedoRemove(xid, true); } return NULL; } - /* Reject XID if too new */ - if (TransactionIdFollowsOrEquals(xid, origNextXid)) + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) { if (fromdisk) { ereport(WARNING, - (errmsg("removing future two-phase state file for transaction %u", + (errmsg("removing stale two-phase state file for transaction %u", xid))); RemoveTwoPhaseFile(xid, true); } else { ereport(WARNING, - (errmsg("removing future two-phase state from memory for transaction %u", + (errmsg("removing stale two-phase state from memory for transaction %u", xid))); PrepareRedoRemove(xid, true); } @@ -2388,12 +2466,38 @@ RecordTransactionCommitPrepared(TransactionId xid, * a contradiction) */ +#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT) + /* Flush XLOG to disk */ XLogFlush(recptr); /* Mark the transaction committed in pg_xact */ TransactionIdCommitTree(xid, nchildren, children); +#else + + if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF) + { + /* Flush XLOG to disk */ + XLogFlush(recptr); + + /* Mark the transaction committed in pg_xact */ + TransactionIdCommitTree(xid, nchildren, children); + } + else + { + XLogSetAsyncXactLSN(recptr); + + /* + * We must not immediately update the CLOG, since we didn't flush the + * XLOG. Instead, we store the LSN up to which the XLOG must be + * flushed before the CLOG may be updated. + */ + TransactionIdAsyncCommitTree(xid, nchildren, children, recptr); + } + +#endif + /* Checkpoint can proceed now */ MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; @@ -2567,6 +2671,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, gxact->locking_backend = InvalidBackendId; gxact->valid = false; gxact->ondisk = XLogRecPtrIsInvalid(start_lsn); + gxact->infile = gxact->ondisk; gxact->inredo = true; /* yes, added in redo */ strcpy(gxact->gid, gid); @@ -2625,7 +2730,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) * And now we can clean up any files we may have left. */ elog(DEBUG2, "removing 2PC data for transaction %u", xid); - if (gxact->ondisk) + if (gxact->infile || gxact->ondisk) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } @@ -2673,7 +2778,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, * do this optimization if we encounter many collisions in GID * between publisher and subscriber. */ - if (gxact->ondisk) + if (gxact->infile || gxact->ondisk) buf = ReadTwoPhaseFile(gxact->xid, false); else { -- 2.34.1