On Tue, Jan 18, 2022 at 9:19 PM Andres Freund <and...@anarazel.de> wrote: > > > + LWLockAcquire(ProcArrayLock, LW_SHARED); > > + lsn = ShmemVariableCache->finishedProcsLastCommitLSN; > > + for (index = 0; index < ProcGlobal->allProcCount; index++) > > + { > > + XLogRecPtr procLSN = > > ProcGlobal->allProcs[index].lastCommitLSN; > > + if (procLSN > lsn) > > + lsn = procLSN; > > + } > > + LWLockRelease(ProcArrayLock); > > I think it'd be better to go through the pgprocnos infrastructure, so that > only connected procs need to be checked. > > LWLockAcquire(ProcArrayLock, LW_SHARED); > for (i = 0; i < arrayP->numProcs; i++) > { > int pgprocno = arrayP->pgprocnos[i]; > PGPROC *proc = &allProcs[pgprocno]; > > if (proc->lastCommitLSN > lsn) > lsn =proc->lastCommitLSN; > } > > > > diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h > > index a58888f9e9..2a026b0844 100644 > > --- a/src/include/storage/proc.h > > +++ b/src/include/storage/proc.h > > @@ -258,6 +258,11 @@ struct PGPROC > > PGPROC *lockGroupLeader; /* lock group leader, if I'm a member > > */ > > dlist_head lockGroupMembers; /* list of members, if I'm a > > leader */ > > dlist_node lockGroupLink; /* my member link, if I'm a member */ > > + > > + /* > > + * Last transaction metadata. > > + */ > > + XLogRecPtr lastCommitLSN; /* cache of last committed > > LSN */ > > }; > > We do not rely on 64bit integers to be read/written atomically, just 32bit > ones. To make this work for older platforms you'd have to use a > pg_atomic_uint64. On new-ish platforms pg_atomic_read_u64/pg_atomic_write_u64 > end up as plain read/writes, but on older ones they'd do the necessarily > locking to make that safe...
All right, here's an updated patch. The final interface (new function or refactor the existing not to rely on commit_ts) is still TBD (and I'd appreciate input on that from Alvaro and others). Thanks, James Coleman
From 4b1d2db87be0090bc8840e1c6d990443dccb5237 Mon Sep 17 00:00:00 2001 From: jcoleman <jtc...@gmail.com> Date: Fri, 14 Jan 2022 22:26:10 +0000 Subject: [PATCH v3] Expose LSN of last commit via pg_last_committed_xact --- src/backend/access/transam/commit_ts.c | 25 +++++++++++++------ src/backend/access/transam/twophase.c | 2 +- src/backend/access/transam/xact.c | 10 +++++--- src/backend/storage/ipc/procarray.c | 34 ++++++++++++++++++++++++++ src/include/access/commit_ts.h | 5 ++-- src/include/access/transam.h | 2 ++ src/include/catalog/pg_proc.dat | 6 ++--- src/include/storage/proc.h | 5 ++++ src/include/storage/procarray.h | 2 ++ 9 files changed, 73 insertions(+), 18 deletions(-) diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 659109f8d4..f0c3185021 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -33,10 +33,11 @@ #include "funcapi.h" #include "miscadmin.h" #include "pg_trace.h" -#include "storage/shmem.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "utils/pg_lsn.h" /* * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used @@ -93,6 +94,7 @@ static SlruCtlData CommitTsCtlData; typedef struct CommitTimestampShared { TransactionId xidLastCommit; + XLogRecPtr lsnLastCommit; CommitTimestampEntry dataLastCommit; bool commitTsActive; } CommitTimestampShared; @@ -135,7 +137,7 @@ static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid); void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - RepOriginId nodeid) + XLogRecPtr commitLsn, RepOriginId nodeid) { int i; TransactionId headxid; @@ -414,24 +416,28 @@ pg_last_committed_xact(PG_FUNCTION_ARGS) TransactionId xid; RepOriginId nodeid; TimestampTz ts; - Datum values[3]; - bool nulls[3]; + XLogRecPtr lsn; + Datum values[4]; + bool nulls[4]; TupleDesc tupdesc; HeapTuple htup; /* and construct a tuple with our data */ xid = GetLatestCommitTsData(&ts, &nodeid); + lsn = GetLastCommitLSN(); /* * Construct a tuple descriptor for the result row. This must match this * function's pg_proc entry! */ - tupdesc = CreateTemplateTupleDesc(3); + tupdesc = CreateTemplateTupleDesc(4); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid", XIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp", TIMESTAMPTZOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "lsn", + PG_LSNOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "roident", OIDOID, -1, 0); tupdesc = BlessTupleDesc(tupdesc); @@ -447,8 +453,11 @@ pg_last_committed_xact(PG_FUNCTION_ARGS) values[1] = TimestampTzGetDatum(ts); nulls[1] = false; - values[2] = ObjectIdGetDatum((Oid) nodeid); - nulls[2] = false; + values[2] = LSNGetDatum(lsn); + nulls[2] = lsn == InvalidXLogRecPtr; + + values[3] = ObjectIdGetDatum((Oid) nodeid); + nulls[3] = false; } htup = heap_form_tuple(tupdesc, values, nulls); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 271a3146db..cd33849aea 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2300,7 +2300,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionTreeSetCommitTsData(xid, nchildren, children, replorigin_session_origin_timestamp, - replorigin_session_origin); + recptr, replorigin_session_origin); /* * We don't currently try to sleep before flush here ... nor is there any diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c9516e03fa..9da257b83a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1355,6 +1355,7 @@ RecordTransactionCommit(void) else { bool replorigin; + XLogRecPtr commit_lsn; /* * Are we using the replication origins feature? Or, in other words, @@ -1391,7 +1392,7 @@ RecordTransactionCommit(void) SetCurrentTransactionStopTimestamp(); - XactLogCommitRecord(xactStopTimestamp, + commit_lsn = XactLogCommitRecord(xactStopTimestamp, nchildren, children, nrels, rels, nmsgs, invalMessages, RelcacheInitFileInval, @@ -1418,7 +1419,8 @@ RecordTransactionCommit(void) TransactionTreeSetCommitTsData(xid, nchildren, children, replorigin_session_origin_timestamp, - replorigin_session_origin); + commit_lsn, replorigin_session_origin); + MyProc->lastCommitLSN = commit_lsn; } /* @@ -5881,7 +5883,9 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, /* Set the transaction commit timestamp and metadata */ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, - commit_time, origin_id); + commit_time, lsn, origin_id); + + MyProc->lastCommitLSN = lsn; if (standbyState == STANDBY_DISABLED) { diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 3be6040289..92586d1492 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -430,6 +430,7 @@ CreateSharedProcArray(void) procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; ShmemVariableCache->xactCompletionCount = 1; + ShmemVariableCache->finishedProcsLastCommitLSN = InvalidXLogRecPtr; } allProcs = ProcGlobal->allProcs; @@ -580,6 +581,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) /* Same with xactCompletionCount */ ShmemVariableCache->xactCompletionCount++; + if (proc->lastCommitLSN > ShmemVariableCache->finishedProcsLastCommitLSN) + ShmemVariableCache->finishedProcsLastCommitLSN = proc->lastCommitLSN; + ProcGlobal->xids[myoff] = InvalidTransactionId; ProcGlobal->subxidStates[myoff].overflowed = false; ProcGlobal->subxidStates[myoff].count = 0; @@ -3914,6 +3918,36 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * GetLastCommitLSN + * + * Return LSN of the most recent COMMIT record written to WAL. + * + * For performance reasons we do not guarantee the result to be perfectly in + * line with current visibility. + */ +XLogRecPtr +GetLastCommitLSN(void) +{ + ProcArrayStruct *arrayP = procArray; + XLogRecPtr lsn; + int i; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + lsn = ShmemVariableCache->finishedProcsLastCommitLSN; + for (i = 0; i < arrayP->numProcs; i++) + { + int pgprocno = arrayP->pgprocnos[i]; + PGPROC *proc = &allProcs[pgprocno]; + + if (proc->lastCommitLSN > lsn) + lsn = proc->lastCommitLSN; + } + LWLockRelease(ProcArrayLock); + + return lsn; +} + /* * XidCacheRemoveRunningXids * diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h index 7662f8e1a9..c7a146ec17 100644 --- a/src/include/access/commit_ts.h +++ b/src/include/access/commit_ts.h @@ -21,11 +21,10 @@ extern PGDLLIMPORT bool track_commit_timestamp; extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, - RepOriginId nodeid); + XLogRecPtr commitLsn, RepOriginId nodeid); extern bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid); -extern TransactionId GetLatestCommitTsData(TimestampTz *ts, - RepOriginId *nodeid); +extern TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid); extern Size CommitTsShmemBuffers(void); extern Size CommitTsShmemSize(void); diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 338dfca5a0..c53f2d3810 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -237,6 +237,8 @@ typedef struct VariableCacheData */ FullTransactionId latestCompletedXid; /* newest full XID that has * committed or aborted */ + /* */ + XLogRecPtr finishedProcsLastCommitLSN; /* * Number of top-level transactions with xids (i.e. which may have diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d6bf1f3274..d5fe8f4161 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6157,11 +6157,11 @@ prosrc => 'pg_xact_commit_timestamp_origin' }, { oid => '3583', - descr => 'get transaction Id, commit timestamp and replication origin of latest transaction commit', + descr => 'get transaction Id, commit timestamp, commit lsn and replication origin of latest transaction commit', proname => 'pg_last_committed_xact', provolatile => 'v', prorettype => 'record', proargtypes => '', - proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}', - proargnames => '{xid,timestamp,roident}', + proallargtypes => '{xid,timestamptz,pg_lsn,oid}', proargmodes => '{o,o,o,o}', + proargnames => '{xid,timestamp,lsn,roident}', prosrc => 'pg_last_committed_xact' }, { oid => '3537', descr => 'get identification of SQL object', diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index a58888f9e9..2a026b0844 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -258,6 +258,11 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * Last transaction metadata. + */ + XLogRecPtr lastCommitLSN; /* cache of last committed LSN */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index e03692053e..ec3baa29e8 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -94,4 +94,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern XLogRecPtr GetLastCommitLSN(void); + #endif /* PROCARRAY_H */ -- 2.17.1