On 2024-03-15 22:59, Kartyshov Ivan wrote:
On 2024-03-11 13:44, Alexander Korotkov wrote:
I picked the second option and left only the AFTER clause for the
BEGIN statement.  I think this should be enough for the beginning.

Thank you for your rework on your patch, here I made some fixes:
0) autocomplete
1) less jumps
2) more description and add cases in doc

I think, it will be useful to have stand-alone statement.
Why you would like to see only AFTER clause for the BEGIN statement?

Rebase and update patch.

--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..759e46ec24 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN <replaceable class="parameter">number_of_milliseconds</replaceable> ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -78,6 +81,50 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><literal>AFTER</literal> <replaceable class="parameter">lsn_value</replaceable></term>
+    <listitem>
+     <para>
+       <command>AFTER</command> clause is used on standby in
+       <link linkend="streaming-replication">physical streaming replication</link>
+       and specifies waiting for the specific WAL location (<acronym>LSN</acronym>)
+       to be replayed before starting the transaction.
+     </para>
+     <para>
+       This option is useful when the user makes some data changes on primary
+       and needs a guarantee to see these changes on standby.  The LSN to wait
+       could be obtained on the primary using
+       <link linkend="functions-admin-backup"><function>pg_current_wal_insert_lsn</function></link>
+       function after committing the relevant changes.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>WITHIN</literal> <replaceable class="parameter">number_of_milliseconds</replaceable></term>
+    <listitem>
+     <para>
+       Provides the timeout for the <command>AFTER</command> clause.
+       Especially helpful to prevent freezing on streaming replication
+       connection failures.
+     </para>
+     <para>
+       On practice we  have three variants of waiting current LSN:
+          waiting forever - (finish on PM DEATH or SIGINT)
+             BEGIN AFTER lsn;
+                wait for changes to be replayed on standby and then start transaction
+          waiting on timeout -
+             BEGIN AFTER lsn WITHIN 60000;
+                wait changes for 60 seconds and then cancel to start transaction
+                to perevent freezing on streaming replication connection failures.
+          no wait, just check -
+             BEGIN AFTER lsn WITHIN 1;
+                some time it is useful just check if changes was replayed
+       <phrase>where <replaceable class="parameter">number_of_milliseconds</replaceable> can:</phrase>
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
 
   <para>
@@ -123,6 +170,33 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
 <programlisting>
 BEGIN;
 </programlisting></para>
+
+  <para>
+   To begin a transaction block after replaying the given <acronym>LSN</acronym>.
+   The command will be canceled if the given <acronym>LSN</acronym> is not
+   reached within the timeout of one second.
+<programlisting>
+BEGIN AFTER '0/3F05F791' WITHIN 1000;
+BEGIN
+</programlisting></para>
+
+  <para>
+   This way we just check (without waiting) if <acronym>LSN</acronym> was replayed,
+   and if it was then start transaction.
+<programlisting>
+BEGIN AFTER '0/3F0FF791' WITHIN 1;
+ERROR:  canceling waiting for LSN due to timeout
+</programlisting></para>
+
+  <para>
+   To begin a transaction block after replaying the given <acronym>LSN</acronym>.
+   The command will be canceled only on user request or postmaster death.
+<programlisting>
+BEGIN AFTER '0/3F0FF791';
+^CCancel request sent
+ERROR:  canceling statement due to user request
+</programlisting></para>
+
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 29c5bec084..89b997ad47 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,14 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for, set latches
+			 * in shared memory array to notify the waiter.
+			 */
+			if (waitLSN &&
+					(XLogRecoveryCtl->lastReplayedEndRecPtr >= pg_atomic_read_u64(&waitLSN->minLSN)))
+				WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..cede90c3b9 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..7549be5dc3 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000000..ca15afa468
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,278 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  Implements waiting for the given LSN, which is used in
+ *	  BEGIN AFTER ... [ WITHIN ... ] clause.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+											 WaitLSNShmemSize(),
+											 &found);
+	if (!found)
+	{
+		SpinLockInit(&waitLSN->mutex);
+		waitLSN->numWaitedProcs = 0;
+		pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+	}
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+	WaitLSNProcInfo cur;
+	int			i;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	cur.procnum = MyProcNumber;
+	cur.waitLSN = lsn;
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+		{
+			WaitLSNProcInfo tmp;
+
+			tmp = waitLSN->procInfos[i];
+			waitLSN->procInfos[i] = cur;
+			cur = tmp;
+		}
+	}
+	waitLSN->procInfos[i] = cur;
+	waitLSN->numWaitedProcs++;
+
+	pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+	int			i;
+	bool		found = false;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].procnum == MyProcNumber)
+			found = true;
+
+		if (found && i < waitLSN->numWaitedProcs - 1)
+		{
+			waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+		}
+	}
+
+	if (!found)
+	{
+		SpinLockRelease(&waitLSN->mutex);
+		return;
+	}
+	waitLSN->numWaitedProcs--;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitLSNSetLatches(XLogRecPtr curLSN)
+{
+	uint32		i,
+				numWakeUpProcs;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	/*
+	 * Set latches for process, whose waited LSNs are already replayed.
+	 */
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		PGPROC	   *backend;
+
+		if (waitLSN->procInfos[i].waitLSN > curLSN)
+			break;
+
+		backend = GetPGProcByNumber(waitLSN->procInfos[i].procnum);
+		SetLatch(&backend->procLatch);
+	}
+
+	/*
+	 * Immediately remove those processes from the shmem array.  Otherwise,
+	 * shmem array items will be here till corresponding processes wake up and
+	 * delete themselves.
+	 */
+	numWakeUpProcs = i;
+	for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+		waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+	waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+	if (haveShmemItem)
+		deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch to wait till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr lsn, int millisecs)
+{
+	XLogRecPtr	curLSN;
+	int			latch_events;
+	TimestampTz endtime;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSN);
+
+	/* Should be only called by a backend */
+	Assert(MyBackendType == B_BACKEND);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is not in progress"),
+				 errhint("Waiting for LSN can only be executed during recovery.")));
+
+	endtime = GetCurrentTimestamp() + millisecs * 1000;
+
+	latch_events = WL_TIMEOUT | WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+	addLSNWaiter(lsn);
+	haveShmemItem = true;
+
+	for (;;)
+	{
+		int			rc;
+		long		delay_ms;
+
+		/* Check if the waited LSN has been replayed */
+		curLSN = GetXLogReplayRecPtr(NULL);
+		if (lsn <= curLSN)
+			break;
+
+		if (millisecs > 0)
+			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+		else
+
+			/* If no timeout is set then wake up in 1 minute for interrupts */
+			delay_ms = 60000;
+
+		if (delay_ms <= 0)
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS, then delete
+		 * the current event from array.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	if (lsn > curLSN)
+	{
+		deleteLSNWaiter();
+		haveShmemItem = false;
+		ereport(ERROR,
+				(errcode(ERRCODE_QUERY_CANCELED),
+				 errmsg("canceling waiting for LSN due to timeout")));
+	}
+	else
+	{
+		haveShmemItem = false;
+	}
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c6e2f679fd..be97f5a870 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -647,6 +647,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
 
+%type <ival>		after_lsn_timeout
+
 %type <node>	json_format_clause
 				json_format_clause_opt
 				json_value_expr
@@ -3193,6 +3195,14 @@ hash_partbound:
 			}
 		;
 
+/*
+ * WITHIN timeout optional clause for BEGIN AFTER lsn
+ */
+after_lsn_timeout:
+			WITHIN Iconst		{ $$ = $2; }
+			| /* EMPTY */           { $$ = 0; }
+		;
+
 /*****************************************************************************
  *
  *	ALTER TYPE
@@ -10949,6 +10959,17 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
+			| START TRANSACTION transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+				{
+					TransactionStmt *n = makeNode(TransactionStmt);
+
+					n->kind = TRANS_STMT_START;
+					n->options = $3;
+					n->afterLsn = $5;
+					n->afterLsnTimeout = $6;
+					n->location = -1;
+					$$ = (Node *) n;
+				}
 			| COMMIT opt_transaction opt_transaction_chain
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
@@ -11053,6 +11074,17 @@ TransactionStmtLegacy:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+				{
+					TransactionStmt *n = makeNode(TransactionStmt);
+
+					n->kind = TRANS_STMT_BEGIN;
+					n->options = $3;
+					n->afterLsn = $5;
+					n->afterLsnTimeout = $6;
+					n->location = -1;
+					$$ = (Node *) n;
+				}
 			| END_P opt_transaction opt_transaction_chain
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..5aed90c935 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 162b1f919d..4b830dc3c8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 83f86a42f7..bf9a685a9a 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -63,8 +64,10 @@
 #include "storage/fd.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
+#include "utils/fmgrprotos.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -606,6 +609,15 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 						{
 							ListCell   *lc;
 
+							if (stmt->afterLsn)
+							{
+								XLogRecPtr	lsn = InvalidXLogRecPtr;
+
+								lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+																	  CStringGetDatum(stmt->afterLsn)));
+								WaitForLSN(lsn, stmt->afterLsnTimeout);
+							}
+
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
 							{
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 19db069ee9..28e22cb9d0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2717,7 +2717,7 @@ psql_completion(const char *text, int start, int end)
 
 /* BEGIN */
 	else if (Matches("BEGIN"))
-		COMPLETE_WITH("WORK", "TRANSACTION", "ISOLATION LEVEL", "READ", "DEFERRABLE", "NOT DEFERRABLE");
+		COMPLETE_WITH("WORK", "TRANSACTION", "ISOLATION LEVEL", "READ", "DEFERRABLE", "NOT DEFERRABLE", "AFTER");
 /* END, ABORT */
 	else if (Matches("END|ABORT"))
 		COMPLETE_WITH("AND", "WORK", "TRANSACTION");
@@ -4610,7 +4610,7 @@ psql_completion(const char *text, int start, int end)
 
 /* START TRANSACTION */
 	else if (Matches("START"))
-		COMPLETE_WITH("TRANSACTION");
+		COMPLETE_WITH("TRANSACTION", "AFTER");
 
 /* TABLE, but not TABLE embedded in other commands */
 	else if (Matches("TABLE"))
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000000..ff4d63d5b0
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+extern void WaitForLSN(XLogRecPtr lsn, int millisecs);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr curLSN);
+extern void WaitLSNCleanup(void);
+
+/* Shared memory structure */
+typedef struct WaitLSNProcInfo
+{
+	int			procnum;
+	XLogRecPtr	waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+	pg_atomic_uint64 minLSN;
+	slock_t		mutex;
+	int			numWaitedProcs;
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+#endif							/* WAIT_LSN_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index aadaf67f57..c2077b5252 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3527,6 +3527,8 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	char	   *afterLsn;		/* target LSN to wait */
+	int			afterLsnTimeout;	/* LSN waiting timeout */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..a1c2a0b13d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_begin_after.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_begin_after.pl b/src/test/recovery/t/043_begin_after.pl
new file mode 100644
index 0000000000..e7859623b5
--- /dev/null
+++ b/src/test/recovery/t/043_begin_after.pl
@@ -0,0 +1,61 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $output = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN AFTER '${lsn1}';
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary AFTER");
+
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn() + 1");
+my $stderr;
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1' WITHIN 1");
+$node_standby->psql(
+	'postgres',
+	"BEGIN AFTER '$lsn2' WITHIN 1",
+	stderr => \$stderr);
+ok( $stderr =~ /canceling waiting for LSN due to timeout/,
+	"get timeout on waiting for unreachable LSN");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aa7a25b8f8..7a2ac178bc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3035,6 +3035,8 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalCompression

Reply via email to