On Tue, Jan 18, 2022 at 9:19 PM Andres Freund <and...@anarazel.de> wrote:
>
> > +     LWLockAcquire(ProcArrayLock, LW_SHARED);
> > +     lsn = ShmemVariableCache->finishedProcsLastCommitLSN;
> > +     for (index = 0; index < ProcGlobal->allProcCount; index++)
> > +     {
> > +             XLogRecPtr procLSN = 
> > ProcGlobal->allProcs[index].lastCommitLSN;
> > +             if (procLSN > lsn)
> > +                     lsn = procLSN;
> > +     }
> > +     LWLockRelease(ProcArrayLock);
>
> I think it'd be better to go through the pgprocnos infrastructure, so that
> only connected procs need to be checked.
>
>     LWLockAcquire(ProcArrayLock, LW_SHARED);
>     for (i = 0; i < arrayP->numProcs; i++)
>     {
>         int         pgprocno = arrayP->pgprocnos[i];
>         PGPROC     *proc = &allProcs[pgprocno];
>
>         if (proc->lastCommitLSN > lsn)
>            lsn =proc->lastCommitLSN;
>    }
>
>
> > diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
> > index a58888f9e9..2a026b0844 100644
> > --- a/src/include/storage/proc.h
> > +++ b/src/include/storage/proc.h
> > @@ -258,6 +258,11 @@ struct PGPROC
> >       PGPROC     *lockGroupLeader;    /* lock group leader, if I'm a member 
> > */
> >       dlist_head      lockGroupMembers;       /* list of members, if I'm a 
> > leader */
> >       dlist_node      lockGroupLink;  /* my member link, if I'm a member */
> > +
> > +     /*
> > +      * Last transaction metadata.
> > +      */
> > +     XLogRecPtr      lastCommitLSN;          /* cache of last committed 
> > LSN */
> >  };
>
> We do not rely on 64bit integers to be read/written atomically, just 32bit
> ones. To make this work for older platforms you'd have to use a
> pg_atomic_uint64. On new-ish platforms pg_atomic_read_u64/pg_atomic_write_u64
> end up as plain read/writes, but on older ones they'd do the necessarily
> locking to make that safe...

All right, here's an updated patch.

The final interface (new function or refactor the existing not to rely
on commit_ts) is still TBD (and I'd appreciate input on that from
Alvaro and others).

Thanks,
James Coleman
From 4b1d2db87be0090bc8840e1c6d990443dccb5237 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Fri, 14 Jan 2022 22:26:10 +0000
Subject: [PATCH v3] Expose LSN of last commit via pg_last_committed_xact

---
 src/backend/access/transam/commit_ts.c | 25 +++++++++++++------
 src/backend/access/transam/twophase.c  |  2 +-
 src/backend/access/transam/xact.c      | 10 +++++---
 src/backend/storage/ipc/procarray.c    | 34 ++++++++++++++++++++++++++
 src/include/access/commit_ts.h         |  5 ++--
 src/include/access/transam.h           |  2 ++
 src/include/catalog/pg_proc.dat        |  6 ++---
 src/include/storage/proc.h             |  5 ++++
 src/include/storage/procarray.h        |  2 ++
 9 files changed, 73 insertions(+), 18 deletions(-)

diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 659109f8d4..f0c3185021 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -33,10 +33,11 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
-#include "storage/shmem.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 /*
  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
@@ -93,6 +94,7 @@ static SlruCtlData CommitTsCtlData;
 typedef struct CommitTimestampShared
 {
 	TransactionId xidLastCommit;
+	XLogRecPtr lsnLastCommit;
 	CommitTimestampEntry dataLastCommit;
 	bool		commitTsActive;
 } CommitTimestampShared;
@@ -135,7 +137,7 @@ static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
 void
 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
 							   TransactionId *subxids, TimestampTz timestamp,
-							   RepOriginId nodeid)
+							   XLogRecPtr commitLsn, RepOriginId nodeid)
 {
 	int			i;
 	TransactionId headxid;
@@ -414,24 +416,28 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
 	TransactionId xid;
 	RepOriginId nodeid;
 	TimestampTz ts;
-	Datum		values[3];
-	bool		nulls[3];
+	XLogRecPtr lsn;
+	Datum		values[4];
+	bool		nulls[4];
 	TupleDesc	tupdesc;
 	HeapTuple	htup;
 
 	/* and construct a tuple with our data */
 	xid = GetLatestCommitTsData(&ts, &nodeid);
+	lsn = GetLastCommitLSN();
 
 	/*
 	 * Construct a tuple descriptor for the result row.  This must match this
 	 * function's pg_proc entry!
 	 */
-	tupdesc = CreateTemplateTupleDesc(3);
+	tupdesc = CreateTemplateTupleDesc(4);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
 					   XIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "lsn",
+					   PG_LSNOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "roident",
 					   OIDOID, -1, 0);
 	tupdesc = BlessTupleDesc(tupdesc);
 
@@ -447,8 +453,11 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
 		values[1] = TimestampTzGetDatum(ts);
 		nulls[1] = false;
 
-		values[2] = ObjectIdGetDatum((Oid) nodeid);
-		nulls[2] = false;
+		values[2] = LSNGetDatum(lsn);
+		nulls[2] = lsn == InvalidXLogRecPtr;
+
+		values[3] = ObjectIdGetDatum((Oid) nodeid);
+		nulls[3] = false;
 	}
 
 	htup = heap_form_tuple(tupdesc, values, nulls);
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 271a3146db..cd33849aea 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2300,7 +2300,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
 	TransactionTreeSetCommitTsData(xid, nchildren, children,
 								   replorigin_session_origin_timestamp,
-								   replorigin_session_origin);
+								   recptr, replorigin_session_origin);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c9516e03fa..9da257b83a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1355,6 +1355,7 @@ RecordTransactionCommit(void)
 	else
 	{
 		bool		replorigin;
+		XLogRecPtr	commit_lsn;
 
 		/*
 		 * Are we using the replication origins feature?  Or, in other words,
@@ -1391,7 +1392,7 @@ RecordTransactionCommit(void)
 
 		SetCurrentTransactionStopTimestamp();
 
-		XactLogCommitRecord(xactStopTimestamp,
+		commit_lsn = XactLogCommitRecord(xactStopTimestamp,
 							nchildren, children, nrels, rels,
 							nmsgs, invalMessages,
 							RelcacheInitFileInval,
@@ -1418,7 +1419,8 @@ RecordTransactionCommit(void)
 
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
 									   replorigin_session_origin_timestamp,
-									   replorigin_session_origin);
+									   commit_lsn, replorigin_session_origin);
+		MyProc->lastCommitLSN = commit_lsn;
 	}
 
 	/*
@@ -5881,7 +5883,9 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 
 	/* Set the transaction commit timestamp and metadata */
 	TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
-								   commit_time, origin_id);
+								   commit_time, lsn, origin_id);
+
+	MyProc->lastCommitLSN = lsn;
 
 	if (standbyState == STANDBY_DISABLED)
 	{
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 3be6040289..92586d1492 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -430,6 +430,7 @@ CreateSharedProcArray(void)
 		procArray->replication_slot_xmin = InvalidTransactionId;
 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
 		ShmemVariableCache->xactCompletionCount = 1;
+		ShmemVariableCache->finishedProcsLastCommitLSN = InvalidXLogRecPtr;
 	}
 
 	allProcs = ProcGlobal->allProcs;
@@ -580,6 +581,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 		/* Same with xactCompletionCount  */
 		ShmemVariableCache->xactCompletionCount++;
 
+		if (proc->lastCommitLSN > ShmemVariableCache->finishedProcsLastCommitLSN)
+			ShmemVariableCache->finishedProcsLastCommitLSN = proc->lastCommitLSN;
+
 		ProcGlobal->xids[myoff] = InvalidTransactionId;
 		ProcGlobal->subxidStates[myoff].overflowed = false;
 		ProcGlobal->subxidStates[myoff].count = 0;
@@ -3914,6 +3918,36 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 	LWLockRelease(ProcArrayLock);
 }
 
+/*
+ * GetLastCommitLSN
+ *
+ * Return LSN of the most recent COMMIT record written to WAL.
+ *
+ * For performance reasons we do not guarantee the result to be perfectly in
+ * line with current visibility.
+ */
+XLogRecPtr
+GetLastCommitLSN(void)
+{
+	ProcArrayStruct *arrayP = procArray;
+	XLogRecPtr	lsn;
+	int			i;
+
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	lsn = ShmemVariableCache->finishedProcsLastCommitLSN;
+	for (i = 0; i < arrayP->numProcs; i++)
+	{
+		int         pgprocno = arrayP->pgprocnos[i];
+		PGPROC     *proc = &allProcs[pgprocno];
+
+		if (proc->lastCommitLSN > lsn)
+			lsn = proc->lastCommitLSN;
+	}
+	LWLockRelease(ProcArrayLock);
+
+	return lsn;
+}
+
 /*
  * XidCacheRemoveRunningXids
  *
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 7662f8e1a9..c7a146ec17 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -21,11 +21,10 @@ extern PGDLLIMPORT bool track_commit_timestamp;
 
 extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
 										   TransactionId *subxids, TimestampTz timestamp,
-										   RepOriginId nodeid);
+										   XLogRecPtr commitLsn, RepOriginId nodeid);
 extern bool TransactionIdGetCommitTsData(TransactionId xid,
 										 TimestampTz *ts, RepOriginId *nodeid);
-extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
-										   RepOriginId *nodeid);
+extern TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid);
 
 extern Size CommitTsShmemBuffers(void);
 extern Size CommitTsShmemSize(void);
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 338dfca5a0..c53f2d3810 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -237,6 +237,8 @@ typedef struct VariableCacheData
 	 */
 	FullTransactionId latestCompletedXid;	/* newest full XID that has
 											 * committed or aborted */
+	/* */
+	XLogRecPtr finishedProcsLastCommitLSN;
 
 	/*
 	 * Number of top-level transactions with xids (i.e. which may have
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d6bf1f3274..d5fe8f4161 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6157,11 +6157,11 @@
   prosrc => 'pg_xact_commit_timestamp_origin' },
 
 { oid => '3583',
-  descr => 'get transaction Id, commit timestamp and replication origin of latest transaction commit',
+  descr => 'get transaction Id, commit timestamp, commit lsn and replication origin of latest transaction commit',
   proname => 'pg_last_committed_xact', provolatile => 'v',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}',
-  proargnames => '{xid,timestamp,roident}',
+  proallargtypes => '{xid,timestamptz,pg_lsn,oid}', proargmodes => '{o,o,o,o}',
+  proargnames => '{xid,timestamp,lsn,roident}',
   prosrc => 'pg_last_committed_xact' },
 
 { oid => '3537', descr => 'get identification of SQL object',
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index a58888f9e9..2a026b0844 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -258,6 +258,11 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * Last transaction metadata.
+	 */
+	XLogRecPtr	lastCommitLSN;		/* cache of last committed LSN */
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index e03692053e..ec3baa29e8 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -94,4 +94,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+extern XLogRecPtr GetLastCommitLSN(void);
+
 #endif							/* PROCARRAY_H */
-- 
2.17.1

Reply via email to