17.02.2025 00:27, Alexander Korotkov wrote:
> On Thu, Feb 6, 2025 at 10:31 AM Yura Sokolov <y.soko...@postgrespro.ru> wrote:
>> I briefly looked into patch and have couple of minor remarks:
>>
>> 1. I don't like `palloc` in the `WaitLSNWakeup`. I believe it wont issue
>> problems, but still don't like it. I'd prefer to see local fixed array, say
>> of 16 elements, and loop around remaining function body acting in batch of
>> 16 wakeups. Doubtfully there will be more than 16 waiting clients often,
>> and even then it wont be much heavier than fetching all at once.
>
> OK, I've refactored this to use static array of 16 size. palloc() is
> used only if we don't fit static array.
I've rebased patch and:
- fixed compiler warning in wait.c ("maybe uninitialized 'result'").
- made a loop without call to palloc in WaitLSNWakeup. It is with "goto" to
keep indentation, perhaps `do {} while` would be better?
-------
regards
Yura Sokolov aka funny-falcon
From fa107e15eab3ec2493f0663f03b563d49979e0b5 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Fri, 28 Feb 2025 15:40:18 +0300
Subject: [PATCH v3] Implement WAIT FOR command
WAIT FOR is to be used on standby and specifies waiting for
the specific WAL location to be replayed. This option is useful when
the user makes some data changes on primary and needs a guarantee to see
these changes are on standby.
The queue of waiters is stored in the shared memory as an LSN-ordered pairing
heap, where the waiter with the nearest LSN stays on the top. During
the replay of WAL, waiters whose LSNs have already been replayed are deleted
from the shared memory pairing heap and woken up by setting their latches.
WAIT FOR needs to wait without any snapshot held. Otherwise, the snapshot
could prevent the replay of WAL records, implying a kind of self-deadlock.
This is why separate utility command seems appears to be the most robust
way to implement this functionality. It's not possible to implement this as
a function. Previous experience shows that stored procedures also have
limitation in this aspect.
Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira
Reviewed-by: Heikki Linnakangas, Kyotaro Horiguchi
---
doc/src/sgml/ref/allfiles.sgml | 1 +
doc/src/sgml/reference.sgml | 1 +
src/backend/access/transam/Makefile | 3 +-
src/backend/access/transam/meson.build | 1 +
src/backend/access/transam/xact.c | 6 +
src/backend/access/transam/xlog.c | 7 +
src/backend/access/transam/xlogrecovery.c | 11 +
src/backend/access/transam/xlogwait.c | 347 ++++++++++++++++++
src/backend/commands/Makefile | 3 +-
src/backend/commands/meson.build | 1 +
src/backend/commands/wait.c | 185 ++++++++++
src/backend/lib/pairingheap.c | 18 +-
src/backend/parser/gram.y | 14 +-
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/lmgr/proc.c | 6 +
src/backend/tcop/pquery.c | 12 +-
src/backend/tcop/utility.c | 22 ++
.../utils/activity/wait_event_names.txt | 2 +
src/include/access/xlogwait.h | 89 +++++
src/include/commands/wait.h | 21 ++
src/include/lib/pairingheap.h | 3 +
src/include/nodes/parsenodes.h | 7 +
src/include/parser/kwlist.h | 1 +
src/include/storage/lwlocklist.h | 1 +
src/include/tcop/cmdtaglist.h | 1 +
src/test/recovery/meson.build | 1 +
src/test/recovery/t/045_wait_for_lsn.pl | 217 +++++++++++
src/tools/pgindent/typedefs.list | 4 +
28 files changed, 977 insertions(+), 11 deletions(-)
create mode 100644 src/backend/access/transam/xlogwait.c
create mode 100644 src/backend/commands/wait.c
create mode 100644 src/include/access/xlogwait.h
create mode 100644 src/include/commands/wait.h
create mode 100644 src/test/recovery/t/045_wait_for_lsn.pl
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index f5be638867a..8b585cba751 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index ff85ace83fc..bd14ec00d2d 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 661c55a9db7..a32f473e0a2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,7 +36,8 @@ OBJS = \
xlogreader.o \
xlogrecovery.o \
xlogstats.o \
- xlogutils.o
+ xlogutils.o \
+ xlogwait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index e8ae9b13c8e..74a62ab3eab 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,6 +24,7 @@ backend_sources += files(
'xlogrecovery.c',
'xlogstats.c',
'xlogutils.c',
+ 'xlogwait.c',
)
# used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1b4f21a88d3..e617ae8ead5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,6 +31,7 @@
#include "access/xloginsert.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_enum.h"
@@ -2826,6 +2827,11 @@ AbortTransaction(void)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Clear wait information and command progress indicator */
pgstat_report_wait_end();
pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 799fc739e18..b9abb696a5e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,6 +62,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@@ -6219,6 +6220,12 @@ StartupXLOG(void)
UpdateControlFile();
LWLockRelease(ControlFileLock);
+ /*
+ * Wake up all waiters for replay LSN. They need to report an error that
+ * recovery was ended before reaching the target LSN.
+ */
+ WaitLSNWakeup(InvalidXLogRecPtr);
+
/*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 52f53fa12e0..b03a39b510d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
@@ -1831,6 +1832,16 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for then walk
+ * over the shared memory array and set latches to notify the
+ * waiters.
+ */
+ if (waitLSNState &&
+ (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
+ WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
new file mode 100644
index 00000000000..a0f0e480a48
--- /dev/null
+++ b/src/backend/access/transam/xlogwait.c
@@ -0,0 +1,347 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.c
+ * Implements waiting for the given replay LSN, which is used in
+ * WAIT FOR lsn '...'
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogwait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
+
+struct WaitLSNState *waitLSNState = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX);
+ pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL);
+ memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
+ }
+}
+
+/*
+ * Comparison function for waitLSN->waitersHeap heap. Waiting processes are
+ * ordered by lsn, so that the waiter with smallest lsn is at the top.
+ */
+static int
+waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+ const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
+ const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
+
+ if (aproc->waitLSN < bproc->waitLSN)
+ return 1;
+ else if (aproc->waitLSN > bproc->waitLSN)
+ return -1;
+ else
+ return 0;
+}
+
+/*
+ * Update waitLSN->minWaitedLSN according to the current state of
+ * waitLSN->waitersHeap.
+ */
+static void
+updateMinWaitedLSN(void)
+{
+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
+
+ if (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
+
+ minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
+ }
+
+ pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN);
+}
+
+/*
+ * Put the current process into the heap of LSN waiters.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ Assert(!procInfo->inHeap);
+
+ procInfo->procno = MyProcNumber;
+ procInfo->waitLSN = lsn;
+
+ pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = true;
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove the current process from the heap of LSN waiters if it's there.
+ */
+static void
+deleteLSNWaiter(void)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ if (!procInfo->inHeap)
+ {
+ LWLockRelease(WaitLSNLock);
+ return;
+ }
+
+ pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode);
+ procInfo->inHeap = false;
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
+ * on the stack. It should be enough to take single iteration for most cases.
+ */
+#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
+
+/*
+ * Remove waiters whose LSN has been replayed from the heap and set their
+ * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
+ * and set latches for all waiters.
+ */
+void
+WaitLSNWakeup(XLogRecPtr currentLSN)
+{
+ int i;
+ ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
+ int numWakeUpProcs;
+
+resume:
+ numWakeUpProcs = 0;
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ /*
+ * Iterate the pairing heap of waiting processes till we find LSN not yet
+ * replayed. Record the process numbers to wake up, but to avoid holding
+ * the lock for too long, send the wakeups only after releasing the lock.
+ */
+ while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap);
+ WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+
+ if (!XLogRecPtrIsInvalid(currentLSN) &&
+ procInfo->waitLSN > currentLSN)
+ break;
+
+ Assert(numWakeUpProcs < MaxBackends);
+ wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
+ (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
+ procInfo->inHeap = false;
+
+ if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
+ break;
+ }
+
+ updateMinWaitedLSN();
+
+ LWLockRelease(WaitLSNLock);
+
+ /*
+ * Set latches for processes, whose waited LSNs are already replayed. As
+ * the time consuming operations, we do it this outside of WaitLSNLock.
+ * This is actually fine because procLatch isn't ever freed, so we just
+ * can potentially set the wrong process' (or no process') latch.
+ */
+ for (i = 0; i < numWakeUpProcs; i++)
+ SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
+
+ /* Need to recheck if there were more waiters than static array size. */
+ if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
+ goto resume;
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+ /*
+ * We do a fast-path check of the 'inHeap' flag without the lock. This
+ * flag is set to true only by the process itself. So, it's only possible
+ * to get a false positive. But that will be eliminated by a recheck
+ * inside deleteLSNWaiter().
+ */
+ if (waitLSNState->procInfos[MyProcNumber].inHeap)
+ deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+WaitLSNResult
+WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
+{
+ XLogRecPtr currentLSN;
+ TimestampTz endtime = 0;
+ int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSNState);
+
+ /* Should have a valid proc number */
+ Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
+
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery is not in progress. Given that we detected this in the
+ * very first check, this procedure was mistakenly called on primary.
+ * However, it's possible that standby was promoted concurrently to
+ * the procedure call, while target LSN is replayed. So, we still
+ * check the last replay LSN before reporting an error.
+ */
+ if (PromoteIsTriggered() && targetLSN <= GetXLogReplayRecPtr(NULL))
+ return WAIT_LSN_RESULT_SUCCESS;
+ return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
+ }
+ else
+ {
+ /* If target LSN is already replayed, exit immediately */
+ if (targetLSN <= GetXLogReplayRecPtr(NULL))
+ return WAIT_LSN_RESULT_SUCCESS;
+ }
+
+ if (timeout > 0)
+ {
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+ wake_events |= WL_TIMEOUT;
+ }
+
+ /*
+ * Add our process to the pairing heap of waiters. It might happen that
+ * target LSN gets replayed before we do. Another check at the beginning
+ * of the loop below prevents the race condition.
+ */
+ addLSNWaiter(targetLSN);
+
+ for (;;)
+ {
+ int rc;
+ long delay_ms = 0;
+
+ /* Recheck that recovery is still in-progress */
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery was ended, but recheck if target LSN was already
+ * replayed. See the comment regarding deleteLSNWaiter() below.
+ */
+ deleteLSNWaiter();
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ if (PromoteIsTriggered() && targetLSN <= currentLSN)
+ return WAIT_LSN_RESULT_SUCCESS;
+ return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
+ }
+ else
+ {
+ /* Check if the waited LSN has been replayed */
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ if (targetLSN <= currentLSN)
+ break;
+ }
+
+ /*
+ * If the timeout value is specified, calculate the number of
+ * milliseconds before the timeout. Exit if the timeout is already
+ * reached.
+ */
+ if (timeout > 0)
+ {
+ delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
+ if (delay_ms <= 0)
+ break;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ rc = WaitLatch(MyLatch, wake_events, delay_ms,
+ WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit"),
+ errcontext("while waiting for LSN replay")));
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ /*
+ * Delete our process from the shared memory pairing heap. We might
+ * already be deleted by the startup process. The 'inHeap' flag prevents
+ * us from the double deletion.
+ */
+ deleteLSNWaiter();
+
+ /*
+ * If we didn't reach the target LSN, we must be exited by timeout.
+ */
+ if (targetLSN > currentLSN)
+ return WAIT_LSN_RESULT_TIMEOUT;
+
+ return WAIT_LSN_RESULT_SUCCESS;
+}
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 85cfea6fd71..12459111f7c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -63,6 +63,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index ce8d1ab8bac..b1c60f60ea7 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -52,4 +52,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..a5f44de1303
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,185 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "catalog/pg_collation_d.h"
+#include "commands/wait.h"
+#include "executor/executor.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/fmgrprotos.h"
+#include "utils/formatting.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+void
+ExecWaitStmt(WaitStmt *stmt, DestReceiver *dest)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ int64 timeout = 0;
+ WaitLSNResult waitLSNResult;
+ bool throw = true;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ const char *result = "<unset>";
+
+ /*
+ * Process the list of parameters.
+ */
+ foreach_node(DefElem, defel, stmt->options)
+ {
+ char *name = str_tolower(defel->defname, strlen(defel->defname),
+ DEFAULT_COLLATION_OID);
+
+ if (strcmp(name, "lsn") == 0)
+ {
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(strVal(defel->arg))));
+ }
+ else if (strcmp(name, "timeout") == 0)
+ {
+ timeout = pg_strtoint64(strVal(defel->arg));
+ }
+ else if (strcmp(name, "throw") == 0)
+ {
+ throw = DatumGetBool(DirectFunctionCall1(boolin,
+ CStringGetDatum(strVal(defel->arg))));
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("wrong wait argument: %s",
+ defel->defname)));
+ }
+ }
+
+ if (XLogRecPtrIsInvalid(lsn))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_PARAMETER),
+ errmsg("\"lsn\" must be specified")));
+
+ if (timeout < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("\"timeout\" must not be negative")));
+
+ /*
+ * We are going to wait for the LSN replay. We should first care that we
+ * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * Otherwise, our snapshot could prevent the replay of WAL records
+ * implying a kind of self-deadlock. This is the reason why
+ * pg_wal_replay_wait() is a procedure, not a function.
+ *
+ * At first, we should check there is no active snapshot. According to
+ * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+ * processed with a snapshot. Thankfully, we can pop this snapshot,
+ * because PortalRunUtility() can tolerate this.
+ */
+ if (ActiveSnapshotSet())
+ PopActiveSnapshot();
+
+ /*
+ * At second, invalidate a catalog snapshot if any. And we should be done
+ * with the preparation.
+ */
+ InvalidateCatalogSnapshot();
+
+ /* Give up if there is still an active or registered snapshot. */
+ if (HaveRegisteredOrActiveSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("WAIT FOR must be only called without an active or registered snapshot"),
+ errdetail("Make sure WAIT FOR isn't called within a transaction with an isolation level higher than READ COMMITTED, procedure, or a function.")));
+
+ /*
+ * As the result we should hold no snapshot, and correspondingly our xmin
+ * should be unset.
+ */
+ Assert(MyProc->xmin == InvalidTransactionId);
+
+ waitLSNResult = WaitForLSNReplay(lsn, timeout);
+
+ /*
+ * Process the result of WaitForLSNReplay(). Throw appropriate error if
+ * needed.
+ */
+ switch (waitLSNResult)
+ {
+ case WAIT_LSN_RESULT_SUCCESS:
+ /* Nothing to do on success */
+ result = "success";
+ break;
+
+ case WAIT_LSN_RESULT_TIMEOUT:
+ if (throw)
+ ereport(ERROR,
+ (errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
+ else
+ result = "timeout";
+ break;
+
+ case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+ if (throw)
+ {
+ if (PromoteIsTriggered())
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))));
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the replay LSN can only be executed during recovery.")));
+ }
+ }
+ else
+ result = "not in recovery";
+ break;
+ }
+
+ /* need a tuple descriptor representing a single TEXT column */
+ tupdesc = WaitStmtResultDesc(stmt);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+ /* Send it */
+ do_text_output_oneline(tstate, result);
+
+ end_tup_output(tstate);
+}
+
+TupleDesc
+WaitStmtResultDesc(WaitStmt *stmt)
+{
+ TupleDesc tupdesc;
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "RESULT STATUS",
+ TEXTOID, -1, 0);
+ return tupdesc;
+}
diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c
index 0aef8a88f1b..fa8431f7946 100644
--- a/src/backend/lib/pairingheap.c
+++ b/src/backend/lib/pairingheap.c
@@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
pairingheap *heap;
heap = (pairingheap *) palloc(sizeof(pairingheap));
+ pairingheap_initialize(heap, compare, arg);
+
+ return heap;
+}
+
+/*
+ * pairingheap_initialize
+ *
+ * Same as pairingheap_allocate(), but initializes the pairing heap in-place
+ * rather than allocating a new chunk of memory. Useful to store the pairing
+ * heap in a shared memory.
+ */
+void
+pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
+ void *arg)
+{
heap->ph_compare = compare;
heap->ph_arg = arg;
heap->ph_root = NULL;
-
- return heap;
}
/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c6..11265ae3383 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -303,7 +303,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -786,7 +786,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1114,6 +1114,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -16341,6 +16342,14 @@ xml_passing_mech:
| BY VALUE_P
;
+WaitStmt:
+ WAIT FOR generic_option_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->options = $3;
+ $$ = (Node *)n;
+ }
+ ;
/*
* Aggregate decoration clauses
@@ -17999,6 +18008,7 @@ unreserved_keyword:
| VIEWS
| VIRTUAL
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 174eed70367..27b447b7a7a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -24,6 +24,7 @@
#include "access/twophase.h"
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -148,6 +149,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, WaitEventCustomShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
+ size = add_size(size, WaitLSNShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@@ -340,6 +342,7 @@ CreateOrAttachShmemStructs(void)
StatsShmemInit();
WaitEventCustomShmemInit();
InjectionPointShmemInit();
+ WaitLSNShmemInit();
}
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 49204f91a20..dbb613663fa 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -896,6 +897,11 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index dea24453a6c..61cf02c9527 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1194,10 +1194,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
MemoryContextSwitchTo(portal->portalContext);
/*
- * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
- * under us, so don't complain if it's now empty. Otherwise, our snapshot
- * should be the top one; pop it. Note that this could be a different
- * snapshot from the one we made above; see EnsurePortalSnapshotExists.
+ * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot
+ * stack from under us, so don't complain if it's now empty. Otherwise,
+ * our snapshot should be the top one; pop it. Note that this could be a
+ * different snapshot from the one we made above; see
+ * EnsurePortalSnapshotExists.
*/
if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
{
@@ -1792,7 +1793,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
IsA(utilityStmt, ListenStmt) ||
IsA(utilityStmt, NotifyStmt) ||
IsA(utilityStmt, UnlistenStmt) ||
- IsA(utilityStmt, CheckPointStmt))
+ IsA(utilityStmt, CheckPointStmt) ||
+ IsA(utilityStmt, WaitStmt))
return false;
return true;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 25fe3d58016..d23ac3b0f0b 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_PrepareStmt:
case T_UnlistenStmt:
case T_VariableSetStmt:
+ case T_WaitStmt:
{
/*
* These modify only backend-local state, so they're OK to run
@@ -1065,6 +1067,12 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ ExecWaitStmt((WaitStmt *) parsetree, dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2067,6 +2075,9 @@ UtilityReturnsTuples(Node *parsetree)
case T_VariableShowStmt:
return true;
+ case T_WaitStmt:
+ return true;
+
default:
return false;
}
@@ -2122,6 +2133,9 @@ UtilityTupleDescriptor(Node *parsetree)
return GetPGVariableResultDesc(n->name);
}
+ case T_WaitStmt:
+ return WaitStmtResultDesc((WaitStmt *) parsetree);
+
default:
return NULL;
}
@@ -3099,6 +3113,10 @@ CreateCommandTag(Node *parsetree)
}
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{
@@ -3697,6 +3715,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_DDL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index e199f071628..3b282043eca 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -88,6 +88,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
@@ -346,6 +347,7 @@ WALSummarizer "Waiting to read or update WAL summarization state."
DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
+WaitLSN "Waiting to read or update shared Wait-for-LSN state."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
new file mode 100644
index 00000000000..0acc61eba5f
--- /dev/null
+++ b/src/include/access/xlogwait.h
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.h
+ * Declarations for LSN replay waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogwait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOG_WAIT_H
+#define XLOG_WAIT_H
+
+#include "lib/pairingheap.h"
+#include "port/atomics.h"
+#include "postgres.h"
+#include "storage/procnumber.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/*
+ * WaitLSNProcInfo - the shared memory structure representing information
+ * about the single process, which may wait for LSN replay. An item of
+ * waitLSN->procInfos array.
+ */
+typedef struct WaitLSNProcInfo
+{
+ /* LSN, which this process is waiting for */
+ XLogRecPtr waitLSN;
+
+ /* Process to wake up once the waitLSN is replayed */
+ ProcNumber procno;
+
+ /*
+ * A flag indicating that this item is present in
+ * waitLSNState->waitersHeap
+ */
+ bool inHeap;
+
+ /* A pairing heap node for participation in waitLSNState->waitersHeap */
+ pairingheap_node phNode;
+} WaitLSNProcInfo;
+
+/*
+ * WaitLSNState - the shared memory state for the replay LSN waiting facility.
+ */
+typedef struct WaitLSNState
+{
+ /*
+ * The minimum LSN value some process is waiting for. Used for the
+ * fast-path checking if we need to wake up any waiters after replaying a
+ * WAL record. Could be read lock-less. Update protected by WaitLSNLock.
+ */
+ pg_atomic_uint64 minWaitedLSN;
+
+ /*
+ * A pairing heap of waiting processes order by LSN values (least LSN is
+ * on top). Protected by WaitLSNLock.
+ */
+ pairingheap waitersHeap;
+
+ /*
+ * An array with per-process information, indexed by the process number.
+ * Protected by WaitLSNLock.
+ */
+ WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+/*
+ * Result statuses for WaitForLSNReplay().
+ */
+typedef enum
+{
+ WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */
+ WAIT_LSN_RESULT_TIMEOUT, /* Timeout occurred */
+ WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our
+ * wait */
+} WaitLSNResult;
+
+extern PGDLLIMPORT WaitLSNState *waitLSNState;
+
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNWakeup(XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout);
+
+#endif /* XLOG_WAIT_H */
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..a7fa00ed41e
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,21 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+
+#include "nodes/parsenodes.h"
+#include "tcop/dest.h"
+
+extern void ExecWaitStmt(WaitStmt *stmt, DestReceiver *dest);
+extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt);
+
+#endif /* WAIT_H */
diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h
index 3c57d3fda1b..567586f2ecf 100644
--- a/src/include/lib/pairingheap.h
+++ b/src/include/lib/pairingheap.h
@@ -77,6 +77,9 @@ typedef struct pairingheap
extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
void *arg);
+extern void pairingheap_initialize(pairingheap *heap,
+ pairingheap_comparator compare,
+ void *arg);
extern void pairingheap_free(pairingheap *heap);
extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
extern pairingheap_node *pairingheap_first(pairingheap *heap);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0b208f51bdd..1c3baac08a9 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4317,4 +4317,11 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+typedef struct WaitStmt
+{
+ NodeTag type;
+ List *options;
+} WaitStmt;
+
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 40cf090ce61..6d834f25d2d 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -493,6 +493,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index cf565452382..a3f66071288 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, WaitLSN)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index d250a714d59..c4606d65043 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 057bcde1434..8a8f1f6c427 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -53,6 +53,7 @@ tests += {
't/042_low_level_backup.pl',
't/043_no_contrecord_switch.pl',
't/044_invalidate_inactive_slots.pl',
+ 't/045_wait_for_lsn.pl',
],
},
}
diff --git a/src/test/recovery/t/045_wait_for_lsn.pl b/src/test/recovery/t/045_wait_for_lsn.pl
new file mode 100644
index 00000000000..79c2c49b9ce
--- /dev/null
+++ b/src/test/recovery/t/045_wait_for_lsn.pl
@@ -0,0 +1,217 @@
+# Checks waiting for the lsn replay on standby using
+# WAIT FOR procedure.
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# 1. Make sure that WAIT FOR works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn1}', TIMEOUT '1000000';
+ SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok((split("\n", $output))[-1] >= 0,
+ "standby reached the same LSN as primary after WAIT FOR");
+
+# 2. Check that new data is visible after calling WAIT FOR
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn2}';
+ SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby reflects the recent changes on primary
+ok((split("\n", $output))[-1] eq 30,
+ "standby reached the same LSN as primary");
+
+# 3. Check that waiting for unreachable LSN triggers the timeout. The
+# unreachable LSN must be well in advance. So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn2}', TIMEOUT '10';");
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${lsn3}', TIMEOUT '1000';",
+ stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+ "get timeout on waiting for unreachable LSN");
+
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn2}', TIMEOUT '10', THROW 'false';]);
+ok($output eq "success",
+ "WAIT FOR returns correct status after successful waiting");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn3}', TIMEOUT '10', THROW 'false';]);
+ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
+
+# 4. Check that WAIT FOR triggers an error if called on primary,
+# within another function, or inside a transaction with an isolation level
+# higher than READ COMMITTED.
+
+$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+ stderr => \$stderr);
+ok( $stderr =~ /recovery is not in progress/,
+ "get an error when running on the primary");
+
+$node_standby->psql(
+ 'postgres',
+ "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';",
+ stderr => \$stderr);
+ok( $stderr =~
+ /WAIT FOR must be only called without an active or registered snapshot/,
+ "get an error when running in a transaction with an isolation level higher than REPEATABLE READ"
+);
+
+$node_primary->safe_psql(
+ 'postgres', qq[
+CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$
+ BEGIN
+ EXECUTE format('WAIT FOR LSN %L;', target_lsn);
+ END
+\$\$
+LANGUAGE plpgsql;
+]);
+
+$node_primary->wait_for_catchup($node_standby);
+$node_standby->psql(
+ 'postgres',
+ "SELECT pg_wal_replay_wait_wrap('${lsn3}');",
+ stderr => \$stderr);
+ok( $stderr =~
+ /WAIT FOR must be only called without an active or registered snapshot/,
+ "get an error when running within another function");
+
+# 5. Also, check the scenario of multiple LSN waiters. We make 5 background
+# psql sessions each waiting for a corresponding insertion. When waiting is
+# finished, stored procedures logs if there are visible as many rows as
+# should be.
+$node_primary->safe_psql(
+ 'postgres', qq[
+CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
+ DECLARE
+ count int;
+ BEGIN
+ SELECT count(*) FROM wait_test INTO count;
+ IF count >= 31 + i THEN
+ RAISE LOG 'count %', i;
+ END IF;
+ END
+\$\$
+LANGUAGE plpgsql;
+]);
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+my @psql_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (${i});");
+ my $lsn =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+ $psql_sessions[$i] = $node_standby->background_psql('postgres');
+ $psql_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR lsn '${lsn}';
+ SELECT log_count(${i});
+ ]);
+}
+my $log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("count ${i}", $log_offset);
+ $psql_sessions[$i]->quit;
+}
+
+ok(1, 'multiple LSN waiters reported consistent data');
+
+# 6. Check that the standby promotion terminates the wait on LSN. Start
+# waiting for an unreachable LSN then promote. Check the log for the relevant
+# error message. Also, check that waiting for already replayed LSN doesn't
+# cause an error even after promotion.
+my $lsn4 =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $lsn5 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $psql_session = $node_standby->background_psql('postgres');
+$psql_session->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${lsn4}';
+]);
+
+# Make sure standby will be promoted at least at the primary insert LSN we
+# have just observed. Use pg_switch_wal() to force the insert LSN to be
+# written then wait for standby to catchup.
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+$node_primary->wait_for_catchup($node_standby);
+
+$log_offset = -s $node_standby->logfile;
+$node_standby->promote;
+$node_standby->wait_for_log('recovery is not in progress', $log_offset);
+
+ok(1, 'got error after standby promote');
+
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+
+ok(1, 'wait for already replayed LSN exits immediately even after promotion');
+
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn4}', TIMEOUT '10', THROW 'false';]);
+ok($output eq "not in recovery",
+ "WAIT FOR returns correct status after standby promotion");
+
+$node_standby->stop;
+$node_primary->stop;
+
+# If we send \q with $psql_session->quit the command can be sent to the session
+# already closed. So \q is in initial script, here we only finish IPC::Run.
+$psql_session->{run}->finish;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index fcb968e1ffe..7b6c30c8d4f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3169,7 +3169,11 @@ WaitEventIO
WaitEventIPC
WaitEventSet
WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNResult
+WaitLSNState
WaitPMResult
+WaitStmt
WalCloseMethod
WalCompression
WalInsertClass
--
2.43.0