Here I made new patch of feature, discussed above.
WAIT FOR procedure - waits for certain lsn on pause
==========
Synopsis
==========
SELECT pg_wait_lsn(‘LSN’, timeout) returns boolean
Where timeout = 0, will wait infinite without timeout
And if timeout = 1, then just check if lsn was replayed
How to use it
==========
Greg Stark wrote:
That said, I'm not a fan of the specific function names. Remember that
we have polymorphic functions so you could probably just have an
option argument:
If you have any example, I will be glade to see them. Ьy searches have
not been fruitful.
Michael Paquier wrote:
While looking at all the patches proposed, I have noticed that all the
approaches proposed force a wakeup of the waiters in the redo loop of
the startup process for each record, before reading the next record.
It strikes me that there is some interaction with custom resource
managers here, where it is possible to poke at the waiters not for
each record, but after reading some specific records. Something
out-of-core would not be as responsive as the per-record approach,
still responsive enough that the waiters wait on input for an
acceptable amount of time, depending on the frequency of the records
generated by a primary to wake them up. Just something that popped
into my mind while looking a bit at the threads.
I`ll work on this idea to have less impact on the redo system.
On 2023-03-02 13:33, Peter Eisentraut wrote:
But I wonder how a client is going to get the LSN. How would all of
this be used by a client?
As I wrote earlier main purpose of the feature is to achieve
read-your-writes-consistency, while using async replica for reads and
primary for writes. In that case lsn of last modification is stored
inside application.
I'm tempted to think this could be a protocol-layer facility. Every
query automatically returns the current LSN, and every query can also
send along an LSN to wait for, and the client library would just keep
track of the LSN for (what it thinks of as) the connection. So you
get some automatic serialization without having to modify your client
code.
Yes it sounds very tempted. But I think community will be against it.
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..422bb1ed82 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1752,6 +1753,15 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+ {
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..3465673f0a
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,275 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements wait lsn, which allows waiting for events such as
+ * LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xact.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+ slock_t mutex;
+ /* LSNs that different backends are waiting */
+ XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn.value)
+ state->min_lsn.value = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete;
+
+ SpinLockAcquire(&state->mutex);
+
+ lsn_to_delete = state->lsn[MyBackendId];
+ state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ /* If we are deleting the minimal LSN, then choose the next min_lsn */
+ if (lsn_to_delete != InvalidXLogRecPtr &&
+ lsn_to_delete == state->min_lsn.value)
+ {
+ state->min_lsn.value = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->lsn[i] != InvalidXLogRecPtr &&
+ state->lsn[i] < state->min_lsn.value)
+ state->min_lsn.value = state->lsn[i];
+ }
+
+ /* If deleting from the end of the array, shorten the array's used part */
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn.value = PG_UINT64_MAX;
+ }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+
+ if (backend && state->lsn[i] != 0 &&
+ state->lsn[i] <= cur_lsn)
+ {
+ SetLatch(&backend->procLatch);
+ }
+ }
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+ return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ bool res = false;
+ bool wait_forever = (timeout_ms <= 0);
+
+ /*
+ * In transactions, that have isolation level repeatable read or higher
+ * wait lsn creates a snapshot if called first in a block, which can
+ * lead the transaction to working incorrectly
+ */
+
+ if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) {
+ ereport(WARNING,
+ errmsg("Waitlsn may work incorrectly in this isolation level"),
+ errhint("Call wait lsn before starting the transaction"));
+ }
+
+ endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ /* Check if we already reached the needed LSN */
+ if (cur_lsn >= target_lsn)
+ return true;
+
+ AddWaitedLSN(target_lsn);
+
+ for (;;)
+ {
+ int rc;
+ float8 time_left = 0;
+ long time_left_ms = 0;
+
+ time_left = endtime - GetNowFloat();
+
+ /* Use 100 ms as the default timeout to check for interrupts */
+ if (wait_forever || time_left < 0 || time_left > 0.1)
+ time_left_ms = 100;
+ else
+ time_left_ms = (long) ceil(time_left * 1000.0);
+
+ /* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+ CHECK_FOR_INTERRUPTS();
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ ResetLatch(MyLatch);
+
+ if (rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ if (rc & WL_TIMEOUT)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ /* If the time specified by user has passed, stop waiting */
+ time_left = endtime - GetNowFloat();
+ if (!wait_forever && time_left <= 0.0)
+ break;
+ }
+
+ /* If LSN has been replayed */
+ if (target_lsn <= cur_lsn)
+ break;
+ }
+
+ DeleteWaitedLSN();
+
+ if (cur_lsn < target_lsn)
+ ereport(WARNING,
+ errmsg("LSN was not reached"),
+ errhint("Try to increase wait time."));
+ else
+ res = true;
+
+ return res;
+}
+
+Datum
+pg_wait_lsn(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+ uint64_t delay = PG_GETARG_INT32(1);
+
+ PG_RETURN_BOOL(WaitUtility(trg_lsn, delay));
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..760d760356 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void)
AsyncShmemInit();
StatsShmemInit();
+ /*
+ * Init array of events for the wait clause in shared memory
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..1dec418992 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -704,6 +705,9 @@ LockErrorCleanup(void)
AbortStrongLockAcquire();
+ /* If wait lsn was interrupted, then stop waiting for that LSN */
+ DeleteWaitedLSN();
+
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
{
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 220ddb8c01..fd731595b2 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -384,8 +384,6 @@ pg_sleep(PG_FUNCTION_ARGS)
* less than the specified time when WaitLatch is terminated early by a
* non-query-canceling signal such as SIGHUP.
*/
-#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
-
endtime = GetNowFloat() + secs;
for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 66b73c3900..04dde03d72 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11927,4 +11927,9 @@
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+{ oid => '16387', descr => 'wait for LSN until timeout',
+ proname => 'pg_wait_lsn', prorettype => 'bool', proargtypes => 'pg_lsn int8',
+ proargnames => '{trg_lsn,delay}',
+ prosrc => 'pg_wait_lsn' },
+
]
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..fd21e43416
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern void DeleteWaitedLSN(void);
+
+#endif /* WAIT_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index edd59dc432..db2926f965 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -140,4 +140,6 @@ extern int date2isoyearday(int year, int mon, int mday);
extern bool TimestampTimestampTzRequiresRewrite(void);
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
#endif /* TIMESTAMP_H */
diff --git a/src/test/recovery/t/034_waitlsn.pl b/src/test/recovery/t/034_waitlsn.pl
new file mode 100644
index 0000000000..dc9e899671
--- /dev/null
+++ b/src/test/recovery/t/034_waitlsn.pl
@@ -0,0 +1,76 @@
+# Checks wait lsn
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+ "SELECT pg_wait_lsn('0/FFFFFFFF', $two_seconds)");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "wait lsn waits for enough time");
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+# Wait for no wait
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+ "SELECT pg_wait_lsn('$lsn1', 1)");
+
+ok($ret == 0, "zero return value when failed to wait lsn on standby");
+ok($err =~ /WARNING: LSN was not reached/,
+ "correct error message when failed to wait lsn on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that wait lsn works fine and reaches target LSN if given no timeout
+# Wait for infinite
+
+# Add data on primary, memorize primary's last LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+ "SELECT pg_wait_lsn('$lsn2', 0)");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that primary's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "standby reached the same LSN as primary before starting transaction");
+
+$node_standby->stop;
+$node_primary->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 22ea42c16b..6413a661cf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2964,6 +2964,7 @@ WaitEventIPC
WaitEventSet
WaitEventTimeout
WaitPMResult
+WaitState
WalCloseMethod
WalCompression
WalLevel