On 2024-03-15 22:59, Kartyshov Ivan wrote:
On 2024-03-11 13:44, Alexander Korotkov wrote:
I picked the second option and left only the AFTER clause for the
BEGIN statement. I think this should be enough for the beginning.
Thank you for your rework on your patch, here I made some fixes:
0) autocomplete
1) less jumps
2) more description and add cases in doc
I think, it will be useful to have stand-alone statement.
Why you would like to see only AFTER clause for the BEGIN statement?
Rebase and update patch.
--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..759e46ec24 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN <replaceable class="parameter">number_of_milliseconds</replaceable> ]
</synopsis>
</refsynopsisdiv>
@@ -78,6 +81,50 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>AFTER</literal> <replaceable class="parameter">lsn_value</replaceable></term>
+ <listitem>
+ <para>
+ <command>AFTER</command> clause is used on standby in
+ <link linkend="streaming-replication">physical streaming replication</link>
+ and specifies waiting for the specific WAL location (<acronym>LSN</acronym>)
+ to be replayed before starting the transaction.
+ </para>
+ <para>
+ This option is useful when the user makes some data changes on primary
+ and needs a guarantee to see these changes on standby. The LSN to wait
+ could be obtained on the primary using
+ <link linkend="functions-admin-backup"><function>pg_current_wal_insert_lsn</function></link>
+ function after committing the relevant changes.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>WITHIN</literal> <replaceable class="parameter">number_of_milliseconds</replaceable></term>
+ <listitem>
+ <para>
+ Provides the timeout for the <command>AFTER</command> clause.
+ Especially helpful to prevent freezing on streaming replication
+ connection failures.
+ </para>
+ <para>
+ On practice we have three variants of waiting current LSN:
+ waiting forever - (finish on PM DEATH or SIGINT)
+ BEGIN AFTER lsn;
+ wait for changes to be replayed on standby and then start transaction
+ waiting on timeout -
+ BEGIN AFTER lsn WITHIN 60000;
+ wait changes for 60 seconds and then cancel to start transaction
+ to perevent freezing on streaming replication connection failures.
+ no wait, just check -
+ BEGIN AFTER lsn WITHIN 1;
+ some time it is useful just check if changes was replayed
+ <phrase>where <replaceable class="parameter">number_of_milliseconds</replaceable> can:</phrase>
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
<para>
@@ -123,6 +170,33 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
<programlisting>
BEGIN;
</programlisting></para>
+
+ <para>
+ To begin a transaction block after replaying the given <acronym>LSN</acronym>.
+ The command will be canceled if the given <acronym>LSN</acronym> is not
+ reached within the timeout of one second.
+<programlisting>
+BEGIN AFTER '0/3F05F791' WITHIN 1000;
+BEGIN
+</programlisting></para>
+
+ <para>
+ This way we just check (without waiting) if <acronym>LSN</acronym> was replayed,
+ and if it was then start transaction.
+<programlisting>
+BEGIN AFTER '0/3F0FF791' WITHIN 1;
+ERROR: canceling waiting for LSN due to timeout
+</programlisting></para>
+
+ <para>
+ To begin a transaction block after replaying the given <acronym>LSN</acronym>.
+ The command will be canceled only on user request or postmaster death.
+<programlisting>
+BEGIN AFTER '0/3F0FF791';
+^CCancel request sent
+ERROR: canceling statement due to user request
+</programlisting></para>
+
</refsect1>
<refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
</synopsis>
</refsynopsisdiv>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 29c5bec084..89b997ad47 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/waitlsn.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1828,6 +1829,14 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for, set latches
+ * in shared memory array to notify the waiter.
+ */
+ if (waitLSN &&
+ (XLogRecoveryCtl->lastReplayedEndRecPtr >= pg_atomic_read_u64(&waitLSN->minLSN)))
+ WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..cede90c3b9 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ waitlsn.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..7549be5dc3 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'waitlsn.c',
)
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000000..ca15afa468
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,278 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ * Implements waiting for the given LSN, which is used in
+ * BEGIN AFTER ... [ WITHIN ... ] clause.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&waitLSN->mutex);
+ waitLSN->numWaitedProcs = 0;
+ pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+ }
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+ WaitLSNProcInfo cur;
+ int i;
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ cur.procnum = MyProcNumber;
+ cur.waitLSN = lsn;
+
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+ {
+ WaitLSNProcInfo tmp;
+
+ tmp = waitLSN->procInfos[i];
+ waitLSN->procInfos[i] = cur;
+ cur = tmp;
+ }
+ }
+ waitLSN->procInfos[i] = cur;
+ waitLSN->numWaitedProcs++;
+
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+ int i;
+ bool found = false;
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ if (waitLSN->procInfos[i].procnum == MyProcNumber)
+ found = true;
+
+ if (found && i < waitLSN->numWaitedProcs - 1)
+ {
+ waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+ }
+ }
+
+ if (!found)
+ {
+ SpinLockRelease(&waitLSN->mutex);
+ return;
+ }
+ waitLSN->numWaitedProcs--;
+
+ if (waitLSN->numWaitedProcs != 0)
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ else
+ pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+ SpinLockRelease(&waitLSN->mutex);
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitLSNSetLatches(XLogRecPtr curLSN)
+{
+ uint32 i,
+ numWakeUpProcs;
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ /*
+ * Set latches for process, whose waited LSNs are already replayed.
+ */
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ PGPROC *backend;
+
+ if (waitLSN->procInfos[i].waitLSN > curLSN)
+ break;
+
+ backend = GetPGProcByNumber(waitLSN->procInfos[i].procnum);
+ SetLatch(&backend->procLatch);
+ }
+
+ /*
+ * Immediately remove those processes from the shmem array. Otherwise,
+ * shmem array items will be here till corresponding processes wake up and
+ * delete themselves.
+ */
+ numWakeUpProcs = i;
+ for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+ waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+ waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+ if (waitLSN->numWaitedProcs != 0)
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ else
+ pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+ SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+ if (haveShmemItem)
+ deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch to wait till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr lsn, int millisecs)
+{
+ XLogRecPtr curLSN;
+ int latch_events;
+ TimestampTz endtime;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSN);
+
+ /* Should be only called by a backend */
+ Assert(MyBackendType == B_BACKEND);
+
+ if (!RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for LSN can only be executed during recovery.")));
+
+ endtime = GetCurrentTimestamp() + millisecs * 1000;
+
+ latch_events = WL_TIMEOUT | WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ addLSNWaiter(lsn);
+ haveShmemItem = true;
+
+ for (;;)
+ {
+ int rc;
+ long delay_ms;
+
+ /* Check if the waited LSN has been replayed */
+ curLSN = GetXLogReplayRecPtr(NULL);
+ if (lsn <= curLSN)
+ break;
+
+ if (millisecs > 0)
+ delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+ else
+
+ /* If no timeout is set then wake up in 1 minute for interrupts */
+ delay_ms = 60000;
+
+ if (delay_ms <= 0)
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS, then delete
+ * the current event from array.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ if (lsn > curLSN)
+ {
+ deleteLSNWaiter();
+ haveShmemItem = false;
+ ereport(ERROR,
+ (errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("canceling waiting for LSN due to timeout")));
+ }
+ else
+ {
+ haveShmemItem = false;
+ }
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c6e2f679fd..be97f5a870 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -647,6 +647,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <ival> after_lsn_timeout
+
%type <node> json_format_clause
json_format_clause_opt
json_value_expr
@@ -3193,6 +3195,14 @@ hash_partbound:
}
;
+/*
+ * WITHIN timeout optional clause for BEGIN AFTER lsn
+ */
+after_lsn_timeout:
+ WITHIN Iconst { $$ = $2; }
+ | /* EMPTY */ { $$ = 0; }
+ ;
+
/*****************************************************************************
*
* ALTER TYPE
@@ -10949,6 +10959,17 @@ TransactionStmt:
n->location = -1;
$$ = (Node *) n;
}
+ | START TRANSACTION transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+
+ n->kind = TRANS_STMT_START;
+ n->options = $3;
+ n->afterLsn = $5;
+ n->afterLsnTimeout = $6;
+ n->location = -1;
+ $$ = (Node *) n;
+ }
| COMMIT opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
@@ -11053,6 +11074,17 @@ TransactionStmtLegacy:
n->location = -1;
$$ = (Node *) n;
}
+ | BEGIN_P opt_transaction transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+
+ n->kind = TRANS_STMT_BEGIN;
+ n->options = $3;
+ n->afterLsn = $5;
+ n->afterLsnTimeout = $6;
+ n->location = -1;
+ $$ = (Node *) n;
+ }
| END_P opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..5aed90c935 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, WaitEventExtensionShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
+ size = add_size(size, WaitLSNShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
/* Initialize subsystems */
CreateOrAttachShmemStructs();
+ /*
+ * Init array of Latches in shared memory for wait lsn
+ */
+ WaitLSNShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 162b1f919d..4b830dc3c8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 83f86a42f7..bf9a685a9a 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -63,8 +64,10 @@
#include "storage/fd.h"
#include "tcop/utility.h"
#include "utils/acl.h"
+#include "utils/fmgrprotos.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -606,6 +609,15 @@ standard_ProcessUtility(PlannedStmt *pstmt,
{
ListCell *lc;
+ if (stmt->afterLsn)
+ {
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->afterLsn)));
+ WaitForLSN(lsn, stmt->afterLsnTimeout);
+ }
+
BeginTransactionBlock();
foreach(lc, stmt->options)
{
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 19db069ee9..28e22cb9d0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2717,7 +2717,7 @@ psql_completion(const char *text, int start, int end)
/* BEGIN */
else if (Matches("BEGIN"))
- COMPLETE_WITH("WORK", "TRANSACTION", "ISOLATION LEVEL", "READ", "DEFERRABLE", "NOT DEFERRABLE");
+ COMPLETE_WITH("WORK", "TRANSACTION", "ISOLATION LEVEL", "READ", "DEFERRABLE", "NOT DEFERRABLE", "AFTER");
/* END, ABORT */
else if (Matches("END|ABORT"))
COMPLETE_WITH("AND", "WORK", "TRANSACTION");
@@ -4610,7 +4610,7 @@ psql_completion(const char *text, int start, int end)
/* START TRANSACTION */
else if (Matches("START"))
- COMPLETE_WITH("TRANSACTION");
+ COMPLETE_WITH("TRANSACTION", "AFTER");
/* TABLE, but not TABLE embedded in other commands */
else if (Matches("TABLE"))
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000000..ff4d63d5b0
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ * Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+extern void WaitForLSN(XLogRecPtr lsn, int millisecs);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr curLSN);
+extern void WaitLSNCleanup(void);
+
+/* Shared memory structure */
+typedef struct WaitLSNProcInfo
+{
+ int procnum;
+ XLogRecPtr waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+ pg_atomic_uint64 minLSN;
+ slock_t mutex;
+ int numWaitedProcs;
+ WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+#endif /* WAIT_LSN_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index aadaf67f57..c2077b5252 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3527,6 +3527,8 @@ typedef struct TransactionStmt
/* for two-phase-commit related commands */
char *gid pg_node_attr(query_jumble_ignore);
bool chain; /* AND CHAIN option */
+ char *afterLsn; /* target LSN to wait */
+ int afterLsnTimeout; /* LSN waiting timeout */
/* token location, or -1 if unknown */
int location pg_node_attr(query_jumble_location);
} TransactionStmt;
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..a1c2a0b13d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
't/040_standby_failover_slots_sync.pl',
't/041_checkpoint_at_promote.pl',
't/042_low_level_backup.pl',
+ 't/043_begin_after.pl',
],
},
}
diff --git a/src/test/recovery/t/043_begin_after.pl b/src/test/recovery/t/043_begin_after.pl
new file mode 100644
index 0000000000..e7859623b5
--- /dev/null
+++ b/src/test/recovery/t/043_begin_after.pl
@@ -0,0 +1,61 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $output = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN AFTER '${lsn1}';
+ SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary AFTER");
+
+my $lsn2 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn() + 1");
+my $stderr;
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1' WITHIN 1");
+$node_standby->psql(
+ 'postgres',
+ "BEGIN AFTER '$lsn2' WITHIN 1",
+ stderr => \$stderr);
+ok( $stderr =~ /canceling waiting for LSN due to timeout/,
+ "get timeout on waiting for unreachable LSN");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aa7a25b8f8..7a2ac178bc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3035,6 +3035,8 @@ WaitEventIO
WaitEventIPC
WaitEventSet
WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
WaitPMResult
WalCloseMethod
WalCompression