Thank you for your interest in the patch.

On 2024-06-20 11:30, Kyotaro Horiguchi wrote:
Hi, I looked through the patch and have some comments.


====== L68:
+    <title>Recovery Procedures</title>

It looks somewhat confusing and appears as if the section is intended
to explain how to perform recovery. Since this is the first built-in
procedure, I'm not sure how should this section be written. However,
the section immediately above is named "Recovery Control Functions",
so "Reocvery Synchronization Functions" would align better with the
naming of the upper section. (I don't believe we need to be so strcit
about the distinction between functions and procedures here.)

It looks strange that the procedure signature includes the return type.

Good point, change
Recovery Procedures -> Recovery Synchronization Procedures

====== L93:
+ If <parameter>timeout</parameter> is not specified or zero, this
+        procedure returns once WAL is replayed upto
+        <literal>target_lsn</literal>.
+        If the <parameter>timeout</parameter> is specified (in
+ milliseconds) and greater than zero, the procedure waits until the + server actually replays the WAL upto <literal>target_lsn</literal> or + until the given time has passed. On timeout, an error is emitted.

The first sentence should mention the main functionality. Following
precedents, it might be better to use something like "Waits until
recovery surpasses the specified LSN. If no timeout is specified or it
is set to zero, this procedure waits indefinitely for the LSN. If the
timeout is specified (in milliseconds) and is greater than zero, the
procedure waits until the LSN is reached or the specified time has
elapsed. On timeout, or if the server is promoted before the LSN is
reached, an error is emitted."

The detailed explanation that follows the above seems somewhat too
verbose to me, as other functions don't have such detailed examples.

Please offer your description. I think it would be better.

====== L484
/*
+ * Set latches for processes, whose waited LSNs are already replayed. This
+        * involves spinlocks.  So, we shouldn't do this under a spinlock.
+        */

Here, I'm not quite sure what specifically spinlock (or mutex?) is
referring to.  However, more importantly, shouldn't we explain that it
is okay not to use any locks at all, rather than mentioning that
spinlocks should not be used here? I found a similar case around
freelist.c:238, which is written as follows.

                 * Not acquiring ProcArrayLock here which is slightly icky. It's
                 * actually fine because procLatch isn't ever freed, so we just 
can
                 * potentially set the wrong process' (or no process') latch.
                 */
                SetLatch(&ProcGlobal->allProcs[bgwprocno].procLatch);

???

===== L518
+void
+WaitForLSN(XLogRecPtr targetLSN, int64 timeout)

This function is only called within the same module. I'm not sure if
we need to expose it. I we do, the name should probably be more
specific. I'm not quite sure if the division of functionality between
this function and its only caller function is appropriate.  As a
possible refactoring, we could have WaitForLSN() just return the
result as [reached, timedout, promoted] and delegate prerequisition
checks and error reporting to the SQL function.

waitLSN -> waitLSNStates
No, waitLSNStates is not the best name, because waitLSNState is a state,
and waitLSN is not the array of waitLSNStates. We can think about another name, what you think?

===== L524
+       /* Shouldn't be called when shmem isn't initialized */
+       Assert(waitLSN);

Seeing this assertion, I feel that the name "waitLSN" is a bit
obscure. How about renaming it to "waitLSNStates"?



===== L527
+       /* Should be only called by a backend */
+       Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);

This is somewhat excessive, causing a server crash when ending with an
error would suffice. By the way, if I execute "CALL
pg_wal_replay_wait('0/0')" on a logical wansender, the server crashes.
The condition doesn't seem appropriate.

Can you give more information on your server crashes, so I could repeat them.

===== L565
+               if (timeout > 0)
+               {
+                       delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+                       latch_events |= WL_TIMEOUT;
+                       if (delay_ms <= 0)
+                               break;

"timeout" is immutable in the function. Therefore, we can calculate
"latch_events" before entering the loop. By the way, the name
'latch_events' seems a bit off. Latch is a kind of event the function
can wait for. Therefore, something like wait_events might be more
appropriate.

"wait_event" - it can't be, because in latch declaration, this events responsible for wake up and not for wait int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

==== L578
+               if (rc & WL_LATCH_SET)
+                       ResetLatch(MyLatch);

I think we usually reset latches unconditionally after returning from
WaitLatch(), even when waiting for timeouts.

No, it depends on you logic, when you have several wake_events and you want to choose what event ignited your latch.
Check applyparallelworker.c:813

===== L798
+ * A process number, same as the index of this item in waitLSN->procInfos.
+        * Stored for convenience.
+        */
+       int                     procnum;

It is described as "(just) for convenience". However, it is referenced
by Startup to fetch the PGPROC entry for the waiter, which necessary
for Startup. That aside, why don't we hold (the pointer to) procLatch
instead of procnum? It makes things simpler and I believe it is our
standard practice.

Can you deeper explane what you meen and give the example?

===== L809
+ /* A flag indicating that this item is added to waitLSN->waitersHeap */
+       bool            inHeap;

The name "inHeap" seems too leteral and might be hard to understand in
most occurances. How about using "waiting" instead?

No, inHeap leteral mean indeed inHeap. Check the comment.
Please suggest a more suitable one.

===== L940
+# Check that new data is visible after calling pg_wal_replay_wait()

On the other hand, the comment for the check for this test states that

+# Make sure the current LSN on standby and is the same as primary's
LSN +ok($output eq 30, "standby reached the same LSN as primary");

I think the first comment and the second should be consistent.

Thanks, I'll rephrase this comment

Oh, I forgot some notes about 044_wal_replay_wait_injection_test.pl.

1. It's not clear why this test needs node_standby2 at all. It seems useless.

I agree with you. What we would need is a second *waiter client*
connecting to the same stanby rather than a second standby. I feel
like having a test where the first waiter is removed while multiple
waiters are waiting, as well as a test where promotion occurs under
the same circumstances.

Can you give more information about this cases step by step, and what
means "remove" and "promotion".

2. The target LSN is set to pg_current_wal_insert_lsn() + 10000.  This
location seems to be unachievable in this test.  So, it's not clear
what race condition this test could potentially detect.
3. I think it would make sense to check for the race condition
reported by Heikki.  That is to insert the injection point at the
beginning of WaitLSNSetLatches().

I think the race condition you mentioned refers to the inconsistency
between the inHeap flag and the pairing heap caused by a race
condition between timeout and wakeup (or perhaps other combinations?
I'm not sure which version of the patch the mentioned race condition
refers to). However, I imagine it is difficult to reliably reproduce
this condition. In that regard, in the latest patch, the coherence
between the inHeap flag and the pairing heap is protected by LWLock,
so I believe we no longer need that test.

No, Alexandre means that Heikki point on race condition just before
LWLock. But injection point we can inject and stepin on backend, and
WaitLSNSetLatches is used from Recovery process. But I have trouble
to wakeup injection point on server.


--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
From a825fbe7262d8405fe05f29aa94cbdc34f1c6f28 Mon Sep 17 00:00:00 2001
From: "i.kartyshov" <i.kartys...@postrgespro.ru>
Date: Mon, 8 Jul 2024 20:42:19 +0300
Subject: [PATCH] Subject: [PATCH v20] Implement pg_wal_replay_wait() stored
 procedure

pg_wal_replay_wait() is to be used on standby and specifies waiting for
the specific WAL location to be replayed before starting the transaction.
This option is useful when the user makes some data changes on primary and
needs a guarantee to see these changes on standby.

The queue of waiters is stored in the shared memory array sorted by LSN.
During replay of WAL waiters whose LSNs are already replayed are deleted from
the shared memory array and woken up by setting of their latches.

pg_wal_replay_wait() needs to wait without any snapshot held.  Otherwise,
the snapshot could prevent the replay of WAL records implying a kind of
self-deadlock.  This is why it is only possible to implement
pg_wal_replay_wait() as a procedure working without an active snapshot,
not a function.

Catversion is bumped.

Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira
---
 doc/src/sgml/func.sgml                        | 109 ++++++
 src/backend/access/transam/xact.c             |   6 +
 src/backend/access/transam/xlog.c             |   7 +
 src/backend/access/transam/xlogrecovery.c     |  11 +
 src/backend/catalog/system_functions.sql      |   3 +
 src/backend/commands/Makefile                 |   3 +-
 src/backend/commands/meson.build              |   1 +
 src/backend/commands/waitlsn.c                | 346 ++++++++++++++++++
 src/backend/lib/pairingheap.c                 |  18 +-
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/lmgr/proc.c               |   6 +
 .../utils/activity/wait_event_names.txt       |   2 +
 src/include/catalog/pg_proc.dat               |   5 +
 src/include/commands/waitlsn.h                |  77 ++++
 src/include/lib/pairingheap.h                 |   3 +
 src/include/storage/lwlocklist.h              |   1 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  22 ++
 src/test/recovery/meson.build                 |   2 +
 src/test/recovery/t/043_wal_replay_wait.pl    | 222 +++++++++++
 .../t/044_wal_replay_wait_injection_test.pl   |  86 +++++
 src/tools/pgindent/typedefs.list              |   2 +
 21 files changed, 932 insertions(+), 3 deletions(-)
 create mode 100644 src/backend/commands/waitlsn.c
 create mode 100644 src/include/commands/waitlsn.h
 create mode 100644 src/test/recovery/t/043_wal_replay_wait.pl
 create mode 100644 src/test/recovery/t/044_wal_replay_wait_injection_test.pl

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 93ee3d4b60..12117df1ab 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28816,6 +28816,115 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
     the pause, the rate of WAL generation and available disk space.
    </para>
 
+   <para>
+    The procedure shown in <xref linkend="procedures-recovery-control-table"/>
+    can be executed only during recovery.
+   </para>
+
+   <table id="recovery-synchronization-procedure-table">
+    <title>Recovery Synchronization Procedure</title>
+    <tgroup cols="1">
+     <thead>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        Procedure
+       </para>
+       <para>
+        Description
+       </para></entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_wal_replay_wait</primary>
+        </indexterm>
+        <function>pg_wal_replay_wait</function> (
+          <parameter>target_lsn</parameter> <type>pg_lsn</type>,
+          <parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>)
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        If <parameter>timeout</parameter> is not specified or zero, this
+        procedure returns once WAL is replayed upto
+        <literal>target_lsn</literal>.
+        If the <parameter>timeout</parameter> is specified (in
+        milliseconds) and greater than zero, the procedure waits until the
+        server actually replays the WAL upto <literal>target_lsn</literal> or
+        until the given time has passed. On timeout, an error is emitted.
+       </para></entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <para>
+    <function>pg_wal_replay_wait</function> waits till
+    <parameter>target_lsn</parameter> to be replayed on standby.
+    That is, after this function execution, the value returned by
+    <function>pg_last_wal_replay_lsn</function> should be greater or equal
+    to the <parameter>target_lsn</parameter> value.  This is useful to achieve
+    read-your-writes-consistency, while using async replica for reads and
+    primary for writes.  In that case <acronym>lsn</acronym> of the last
+    modification should be stored on the client application side or the
+    connection pooler side.
+   </para>
+
+   <para>
+    You can use <function>pg_wal_replay_wait</function> to wait for
+    the <type>pg_lsn</type> value.  For example, an application could update
+    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+    on primary server to get the <acronym>lsn</acronym> given that
+    <varname>synchronous_commit</varname> could be set to
+    <literal>off</literal>.
+
+   <programlisting>
+postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
+UPDATE 100
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+   </programlisting>
+
+   Then an application could run <function>pg_wal_replay_wait</function>
+   with the <acronym>lsn</acronym> obtained from primary.  After that the
+   changes made of primary should be guaranteed to be visible on replica.
+
+   <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20');
+CALL
+postgres=# SELECT * FROM movie WHERE genre = 'Drama';
+ genre
+-------
+(0 rows)
+   </programlisting>
+
+   It may also happen that target <acronym>lsn</acronym> is not achieved
+   within the timeout.  In that case the error is thrown.
+
+   <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
+ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
+    </programlisting>
+
+   </para>
+
+   <para>
+     <function>pg_wal_replay_wait</function> can be used within
+     the transaction.
+
+   <programlisting>
+postgres=# BEGIN;
+BEGIN
+postgres=*# CALL pg_wal_replay_wait('0/306EE20');
+   </programlisting>
+
+   </para>
   </sect2>
 
   <sect2 id="functions-snapshot-synchronization">
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d119ab909d..dfc8cf2dcf 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -38,6 +38,7 @@
 #include "commands/async.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
+#include "commands/waitlsn.h"
 #include "common/pg_prng.h"
 #include "executor/spi.h"
 #include "libpq/be-fsstubs.h"
@@ -2809,6 +2810,11 @@ AbortTransaction(void)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Clear wait information and command progress indicator */
 	pgstat_report_wait_end();
 	pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 33e27a6e72..7c82e2cd8b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -66,6 +66,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "commands/waitlsn.h"
 #include "common/controldata_utils.h"
 #include "common/file_utils.h"
 #include "executor/instrument.h"
@@ -6130,6 +6131,12 @@ StartupXLOG(void)
 	UpdateControlFile();
 	LWLockRelease(ControlFileLock);
 
+	/*
+	 * Wake up all waiters for replay LSN.  They need to report an error that
+	 * recovery was ended before achieving the target LSN.
+	 */
+	WaitLSNSetLatches(InvalidXLogRecPtr);
+
 	/*
 	 * Shutdown the recovery environment.  This must occur after
 	 * RecoverPreparedTransactions() (see notes in lock_twophase_recover())
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 2ed3ea2b45..443b6c2802 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,16 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for then walk
+			 * over the shared memory array and set latches to notify the
+			 * waiters.
+			 */
+			if (waitLSN &&
+				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
+				 pg_atomic_read_u64(&waitLSN->minWaitedLSN)))
+				WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index ae099e328c..623b9539b1 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
   json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
   RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100  AS 'json_populate_recordset' PARALLEL SAFE;
 
+CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
+  LANGUAGE internal AS 'pg_wal_replay_wait';
+
 CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
     IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
     OUT lsn pg_lsn, OUT xid xid, OUT data text)
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..cede90c3b9 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..7549be5dc3 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000000..a545b6f230
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,346 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  Implements waiting for the given replay LSN, which is used in
+ *	  CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "commands/waitlsn.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/injection_point.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/wait_event_types.h"
+
+static int	lsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+					void *arg);
+
+struct WaitLSNState *waitLSN = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+											   WaitLSNShmemSize(),
+											   &found);
+	if (!found)
+	{
+		pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
+		pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
+		memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
+	}
+}
+
+/*
+ * Comparison function for waitLSN->waitersHeap heap.  Waiting processes are
+ * ordered by lsn, so that the waiter with smallest lsn is at the top.
+ */
+static int
+lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+	const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
+	const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
+
+	if (aproc->waitLSN < bproc->waitLSN)
+		return 1;
+	else if (aproc->waitLSN > bproc->waitLSN)
+		return -1;
+	else
+		return 0;
+}
+
+/*
+ * Update waitLSN->minWaitedLSN according to the current state of
+ * waitLSN->waitersHeap.
+ */
+static void
+updateMinWaitedLSN(void)
+{
+	XLogRecPtr	minWaitedLSN = PG_UINT64_MAX;
+
+	if (!pairingheap_is_empty(&waitLSN->waitersHeap))
+	{
+		pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
+
+		minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
+	}
+
+	pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN);
+}
+
+/*
+ * Put the current process into the heap of LSN waiters.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+	WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
+
+	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+	Assert(!procInfo->inHeap);
+
+	procInfo->procnum = MyProcNumber;
+	procInfo->waitLSN = lsn;
+
+	pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
+	procInfo->inHeap = true;
+	updateMinWaitedLSN();
+
+	LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove the current process from the heap of LSN waiters if it's there.
+ */
+static void
+deleteLSNWaiter(void)
+{
+	WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
+
+	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+	if (!procInfo->inHeap)
+	{
+		LWLockRelease(WaitLSNLock);
+		return;
+	}
+
+	pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode);
+	procInfo->inHeap = false;
+	updateMinWaitedLSN();
+
+	LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Set latches of LSN waiters whose LSN has been replayed.  Set latches of all
+ * LSN waiters when InvalidXLogRecPtr is given.
+ */
+void
+WaitLSNSetLatches(XLogRecPtr currentLSN)
+{
+	int			i;
+	int		   *wakeUpProcNums;
+	int			numWakeUpProcs = 0;
+
+	wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
+
+	/*
+	 * Check if startup process is already replayed target lsn
+	 */
+	INJECTION_POINT("pg-wal-replay-set-latch");
+
+	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+	/*
+	 * Iterate the pairing heap of waiting processes till we find LSN not yet
+	 * replayed.  Record the process numbers to set their latches later.
+	 */
+	while (!pairingheap_is_empty(&waitLSN->waitersHeap))
+	{
+		pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
+		WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+
+		if (!XLogRecPtrIsInvalid(currentLSN) &&
+			procInfo->waitLSN > currentLSN)
+			break;
+
+		wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum;
+		(void) pairingheap_remove_first(&waitLSN->waitersHeap);
+		procInfo->inHeap = false;
+	}
+
+	updateMinWaitedLSN();
+
+	LWLockRelease(WaitLSNLock);
+
+	/*
+	 * Set latches for processes, whose waited LSNs are already replayed. This
+	 * involves spinlocks.  So, we shouldn't do this under a spinlock.
+	 */
+	for (i = 0; i < numWakeUpProcs; i++)
+	{
+		PGPROC	   *backend;
+
+		backend = GetPGProcByNumber(wakeUpProcNums[i]);
+		SetLatch(&backend->procLatch);
+	}
+	pfree(wakeUpProcNums);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+	/*
+	 * We do a fast-path check of the 'inHeap' flag without the lock.  This
+	 * flag is set to true only by the process itself.  So, it's only possible
+	 * to get a false positive.  But that will be eliminated by a recheck
+	 * inside deleteLSNWaiter().
+	 */
+	if (waitLSN->procInfos[MyProcNumber].inHeap)
+		deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
+{
+	XLogRecPtr	currentLSN;
+	TimestampTz endtime = 0;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSN);
+
+	/* Should be only called by a backend */
+	Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is not in progress"),
+				 errhint("Waiting for LSN can only be executed during recovery.")));
+
+	/* If target LSN is already replayed, exit immediately */
+	if (targetLSN <= GetXLogReplayRecPtr(NULL))
+		return;
+
+	if (timeout > 0)
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+
+	addLSNWaiter(targetLSN);
+
+	for (;;)
+	{
+		int			rc;
+		int			wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+		long		delay_ms = 0;
+
+		/* Check if the waited LSN has been replayed */
+		currentLSN = GetXLogReplayRecPtr(NULL);
+		if (targetLSN <= currentLSN)
+			break;
+
+		/* Recheck that recovery is still in-progress */
+		if (!RecoveryInProgress())
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("recovery is not in progress"),
+					 errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
+							   LSN_FORMAT_ARGS(targetLSN),
+							   LSN_FORMAT_ARGS(currentLSN))));
+
+		if (timeout > 0)
+		{
+			delay_ms = TimestampDifferenceMilliseconds(
+						GetCurrentTimestamp(),endtime);
+			wake_events |= WL_TIMEOUT;
+			if (delay_ms <= 0)
+				break;
+		}
+
+		CHECK_FOR_INTERRUPTS();
+
+		rc = WaitLatch(MyLatch, wake_events, delay_ms,
+					   WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	deleteLSNWaiter();
+
+	if (targetLSN > currentLSN)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_QUERY_CANCELED),
+				 errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
+						LSN_FORMAT_ARGS(targetLSN),
+						LSN_FORMAT_ARGS(currentLSN))));
+	}
+}
+
+Datum
+pg_wal_replay_wait(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	target_lsn = PG_GETARG_LSN(0);
+	int64		timeout = PG_GETARG_INT64(1);
+
+	if (timeout < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("\"timeout\" must not be negative")));
+
+	/*
+	 * We are going to wait for the LSN replay.  We should first care that we
+	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+	 * Otherwise, our snapshot could prevent the replay of WAL records
+	 * implying a kind of self-deadlock.  This is the reason why
+	 * pg_wal_replay_wait() is a procedure, not a function.
+	 *
+	 * At first, we should check there is no active snapshot.  According to
+	 * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+	 * processed with a snapshot.  Thankfully, we can pop this snapshot,
+	 * because PortalRunUtility() can tolerate this.
+	 */
+	if (ActiveSnapshotSet())
+		PopActiveSnapshot();
+
+	/* Give up if there is still an active sanpshot. */
+	if (ActiveSnapshotSet())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("pg_wal_replay_wait() must be only called without active snapshot"),
+				 errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));
+
+	/*
+	 * At second, invalidate a catalog snapshot if any.  And we should be done
+	 * with the preparation.
+	 */
+	InvalidateCatalogSnapshot();
+	Assert(MyProc->xmin == InvalidTransactionId);
+
+	(void) WaitForLSN(target_lsn, timeout);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c
index fe1deba13e..7858e5e076 100644
--- a/src/backend/lib/pairingheap.c
+++ b/src/backend/lib/pairingheap.c
@@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
 	pairingheap *heap;
 
 	heap = (pairingheap *) palloc(sizeof(pairingheap));
+	pairingheap_initialize(heap, compare, arg);
+
+	return heap;
+}
+
+/*
+ * pairingheap_initialize
+ *
+ * Same as pairingheap_allocate(), but initializes the pairing heap in-place
+ * rather than allocating a new chunk of memory.  Useful to store the pairing
+ * heap in a shared memory.
+ */
+void
+pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
+					   void *arg)
+{
 	heap->ph_compare = compare;
 	heap->ph_arg = arg;
 
 	heap->ph_root = NULL;
-
-	return heap;
 }
 
 /*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2100150f01..b180ae97af 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventCustomShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -357,6 +359,7 @@ CreateOrAttachShmemStructs(void)
 	StatsShmemInit();
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
+	WaitLSNShmemInit();
 }
 
 /*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 1b23efb26f..ac66da8638 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index db37beeaae..d10ca723dc 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -87,6 +87,7 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_REPLAY	"Waiting for a replay of the particular WAL position on the physical standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
@@ -345,6 +346,7 @@ WALSummarizer	"Waiting to read or update WAL summarization state."
 DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
+WaitLSN	"Waiting to read or update shared Wait-for-LSN state."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 0bf413fe05..1140ac5a07 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12161,6 +12161,11 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '16387', descr => 'wait with timeout for target LSN to replayed on standby',
+  proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void',
+  proargtypes => 'pg_lsn int8', proargnames => '{target_lsn,timeout}',
+  prosrc => 'pg_wal_replay_wait' },
+
 { oid => '6291', descr => 'arbitrary value from among input values',
   proname => 'any_value', prokind => 'a', proisstrict => 'f',
   prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000000..a19409e4b3
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,77 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  Declarations for LSN replay waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "lib/pairingheap.h"
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/*
+ * WaitLSNProcInfo - the shared memory structure representing information
+ * about the single process, which may wait for LSN replay.  An item of
+ * waitLSN->procInfos array.
+ */
+typedef struct WaitLSNProcInfo
+{
+	/*
+	 * A process number, same as the index of this item in waitLSN->procInfos.
+	 * Stored for convenience.
+	 */
+	int			procnum;
+
+	/* LSN, which this process is waiting for */
+	XLogRecPtr	waitLSN;
+
+	/* A pairing heap node for participation in waitLSN->waitersHeap */
+	pairingheap_node phNode;
+
+	/* A flag indicating that this item is added to waitLSN->waitersHeap */
+	bool		inHeap;
+} WaitLSNProcInfo;
+
+/*
+ * WaitLSNState - the shared memory state for the replay LSN waiting facility.
+ */
+typedef struct WaitLSNState
+{
+	/*
+	 * The minimum LSN value some process is waiting for.  Used for the
+	 * fast-path checking if we need to wake up any waiters after replaying a
+	 * WAL record.  Could be read lock-less.  Update protected by WaitLSNLock.
+	 */
+	pg_atomic_uint64 minWaitedLSN;
+
+	/*
+	 * A pairing heap of waiting processes order by LSN values (least LSN is
+	 * on top).  Protected by WaitLSNLock.
+	 */
+	pairingheap waitersHeap;
+
+	/*
+	 * An array with per-process information, indexed by the process number.
+	 * Protected by WaitLSNLock.
+	 */
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+
+#endif							/* WAIT_LSN_H */
diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h
index 7eade81535..9e1c26033a 100644
--- a/src/include/lib/pairingheap.h
+++ b/src/include/lib/pairingheap.h
@@ -77,6 +77,9 @@ typedef struct pairingheap
 
 extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
 										 void *arg);
+extern void pairingheap_initialize(pairingheap *heap,
+								   pairingheap_comparator compare,
+								   void *arg);
 extern void pairingheap_free(pairingheap *heap);
 extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
 extern pairingheap_node *pairingheap_first(pairingheap *heap);
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 6a2f64c54f..88dc79b2bd 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, WaitLSN)
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 0135c5a795..f05002e575 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2851,6 +2851,28 @@ sub wait_for_event
 
 =pod
 
+=item $node->wait_for_events(wait_event_name)
+
+Poll pg_stat_activity until reaches wait_event_name.
+
+=cut
+
+sub wait_for_events
+{
+	my ($self, $wait_event_name) = @_;
+
+	$self->poll_query_until(
+		'postgres', qq[
+		SELECT count(*) > 0 FROM pg_stat_activity
+		WHERE wait_event = '$wait_event_name'
+	])
+	  or die
+	  qq(timed out when waiting for to reach wait event '$wait_event_name');
+
+	return;
+}
+=pod
+
 =item $node->wait_for_catchup(standby_name, mode, target_lsn)
 
 Wait for the replication connection with application_name standby_name until
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..3efe4645ac 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,8 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_wal_replay_wait.pl',
+      't/044_wal_replay_wait_injection_test.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_wal_replay_wait.pl b/src/test/recovery/t/043_wal_replay_wait.pl
new file mode 100644
index 0000000000..cc8fb2079a
--- /dev/null
+++ b/src/test/recovery/t/043_wal_replay_wait.pl
@@ -0,0 +1,222 @@
+# Checks waiting for the lsn replay on standby using
+# pg_wal_replay_wait() procedure.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby1 = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby1->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby1->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby1->start;
+
+# 1.
+# Make sure that pg_wal_replay_wait() works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby1->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn1}', 1000000);
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok($output >= 0,
+	"standby reached the same LSN as primary after pg_wal_replay_wait()");
+
+# 2.
+# Check that new data is visible after calling pg_wal_replay_wait()
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby1->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn2}');
+	SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby and is the same as primary's LSN
+ok($output eq 30, "standby reached the same LSN as primary");
+
+# 3.
+# Check two standby waiting LSN
+# Create a streaming second standby with a 1 second delay from the backup
+my $node_standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$node_standby2->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby2->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby2->start;
+
+# Check that new data is visible after calling pg_wal_replay_wait()
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+$lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+$output = $node_standby1->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn2}');
+	SELECT count(*) FROM wait_test;
+]);
+my $output2 = $node_standby2->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn2}');
+	SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby and standby2 are the same as
+# primary's LSN
+ok($output eq 40, "standby1 reached the same LSN as primary");
+ok($output2 eq 40, "standby2 reached the same LSN as primary");
+
+# 4.
+# Create a cascading standby waiting LSN
+$backup_name = 'cas_backup';
+$node_standby1->backup($backup_name);
+
+my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+$cascading_standby->init_from_backup(
+	$node_standby1, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+my $cascading_connstr = $node_standby1->connstr;
+$cascading_standby->append_conf(
+	'postgresql.conf', qq(
+	hot_standby_feedback = on
+	recovery_min_apply_delay = '${delay}s'
+));
+
+$cascading_standby->start;
+
+# Check that new data is visible after calling pg_wal_replay_wait()
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+$lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+$output = $node_standby1->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn2}');
+	SELECT count(*) FROM wait_test;
+]);
+$output2 = $cascading_standby->safe_psql(
+	'postgres', qq[
+	CALL pg_wal_replay_wait('${lsn2}');
+	SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby and standby2 are the same as
+# primary's LSN
+ok($output eq 50, "standby1 reached the same LSN as primary");
+ok($output2 eq 50, "cascading_standby reached the same LSN as primary");
+
+# 5.
+# Check that waiting for unreachable LSN triggers the timeout.  The
+# unreachable LSN must be well in advance.  So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+  $node_primary->safe_psql('postgres',
+	"SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby1->safe_psql('postgres',
+	"CALL pg_wal_replay_wait('${lsn2}', 10);");
+$node_standby1->psql(
+	'postgres',
+	"CALL pg_wal_replay_wait('${lsn3}', 1000);",
+	stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+	"get timeout on waiting for unreachable LSN");
+
+
+# 6.
+# Also, check the scenario of multiple LSN waiters.  We make 5 background psql
+# sessions each waiting for a corresponding insertion.  When waiting is
+# finished, stored procedures logs if there are visible as many rows as
+# should be.
+$node_primary->safe_psql(
+	'postgres', qq[
+CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
+  DECLARE
+    count int;
+  BEGIN
+    SELECT count(*) FROM wait_test INTO count;
+    IF count >= 51 + i THEN
+      RAISE LOG 'count %', i;
+    END IF;
+  END
+\$\$
+LANGUAGE plpgsql;
+]);
+$node_standby1->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+for (my $i = 0; $i < 5; $i++)
+{
+	print($i);
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (${i});");
+	my $lsn =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+	my $psql_session = $node_standby1->background_psql('postgres');
+	$psql_session->query_until(
+		qr/start/, qq[
+		\\echo start
+		CALL pg_wal_replay_wait('${lsn}');
+		SELECT log_count(${i});
+	]);
+}
+my $log_offset = -s $node_standby1->logfile;
+$node_standby1->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_standby1->wait_for_log("count ${i}", $log_offset);
+}
+
+
+# 7.
+# Check that the standby promotion terminates the wait on LSN.  Start
+# waiting for unreachable LSN then promote.  Check the log for the relevant
+# error message.
+my $psql_session = $node_standby1->background_psql('postgres');
+$psql_session->query_until(
+	qr/start/, qq[
+	\\echo start
+	CALL pg_wal_replay_wait('${lsn3}');
+]);
+
+$log_offset = -s $node_standby1->logfile;
+$node_standby1->promote;
+$node_standby1->wait_for_log('recovery is not in progress', $log_offset);
+
+$node_standby1->stop;
+$node_standby2->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/044_wal_replay_wait_injection_test.pl b/src/test/recovery/t/044_wal_replay_wait_injection_test.pl
new file mode 100644
index 0000000000..72d76df20b
--- /dev/null
+++ b/src/test/recovery/t/044_wal_replay_wait_injection_test.pl
@@ -0,0 +1,86 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Time::HiRes qw(usleep);
+use Test::More;
+
+##################################################
+# Test race condition when timeout reached and new wal replayed on standby.
+#
+# This test relies on an injection point that cause to wait before Delete
+# from heap and wait when new lsn added to heap.
+##################################################
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+# Initialize primary node.
+my $node_primary = PostgreSQL::Test::Cluster->new('master');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf(
+	'postgresql.conf', q[
+restart_after_crash = on
+]);
+$node_primary->start;
+
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Setup first standby.
+my $node_standby = PostgreSQL::Test::Cluster->new('standby1');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->start;
+
+# Create table
+$node_primary->safe_psql('postgres', 'CREATE TABLE prim_tab (a int);');
+
+# Create extention injection point
+$node_primary->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# Wait until the extension has been created on the standby
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Attach the point in before deleteLSNWaiter.
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_attach('pg-wal-replay-set-latch', 'wait');");
+
+# Get log offset
+my $logstart = -s $node_standby->logfile;
+
+# Get current lsn from primary
+my $lsn_pr = $node_primary->safe_psql('postgres',
+	"SELECT pg_current_wal_insert_lsn() + 10;");
+
+$node_primary->safe_psql('postgres', 'INSERT INTO prim_tab VALUES (1);');
+# CALL pg_wal_replay_wait with timeout on standby
+my $psql_session1 = $node_standby->background_psql('postgres');
+$psql_session1->query_until(
+	qr/start/, qq[
+	CALL pg_wal_replay_wait('${lsn_pr}',10000);
+]);
+
+# Generate some WAL
+$node_primary->safe_psql('postgres', 'INSERT INTO prim_tab VALUES (1);');
+#$node_primary->wait_for_replay_catchup($node_standby);
+
+# Wait till pg-wal-replay-set-latch apperied in pg_stat_activity
+$node_standby->wait_for_events('client backend', 'pg-wal-replay-set-latch');
+$node_standby->safe_psql('postgres',
+	"SELECT count(*) > 0 FROM pg_stat_activity WHERE
+	wait_event_type = 'Extension' AND wait_event = 'pg-wal-replay-set-latch'");
+
+# Wake up to check the message
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_wakeup('pg-wal-replay-set-latch');");
+
+# Check the log
+ok( $node_standby->log_contains(
+		"timed out while waiting for target LSN", $logstart),
+	"pg_wal_replay_wait timeout");
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9320e4d808..1979b2f8f5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3107,6 +3107,8 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalCompression
-- 
2.34.1

Reply via email to