All rebased and tested
-- Ivan Kartyshov Postgres Professional: http://www.postgrespro.com The Russian Postgres comp...@postgrespro.ru>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index becc2bda62..c7460bd9b8 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -43,6 +43,7 @@ #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" +#include "commands/wait.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" @@ -1752,6 +1753,15 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for, + * set latches in shared memory array to notify the waiter. + */ + if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN()) + { + WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr); + } + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 48f7348f91..d8f6965d8c 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -61,6 +61,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index 42cced9ebe..ec6ab7722a 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -50,4 +50,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 0000000000..2f9be4f9ad --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,275 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements wait lsn, which allows waiting for events such as + * LSN having been replayed on replica. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 2023, Regents of PostgresPro + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xact.h" +#include "access/xlogrecovery.h" +#include "access/xlogdefs.h" +#include "commands/wait.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/backendid.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/timestamp.h" + +/* Add to shared memory array */ +static void AddWaitedLSN(XLogRecPtr lsn_to_wait); + +/* Shared memory structure */ +typedef struct +{ + int backend_maxid; + pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */ + slock_t mutex; + /* LSNs that different backends are waiting */ + XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER]; +} WaitState; + +static WaitState *state; + +/* + * Add the wait event of the current backend to shared memory array + */ +static void +AddWaitedLSN(XLogRecPtr lsn_to_wait) +{ + SpinLockAcquire(&state->mutex); + if (state->backend_maxid < MyBackendId) + state->backend_maxid = MyBackendId; + + state->lsn[MyBackendId] = lsn_to_wait; + + if (lsn_to_wait < state->min_lsn.value) + state->min_lsn.value = lsn_to_wait; + SpinLockRelease(&state->mutex); +} + +/* + * Delete wait event of the current backend from the shared memory array. + */ +void +DeleteWaitedLSN(void) +{ + int i; + XLogRecPtr lsn_to_delete; + + SpinLockAcquire(&state->mutex); + + lsn_to_delete = state->lsn[MyBackendId]; + state->lsn[MyBackendId] = InvalidXLogRecPtr; + + /* If we are deleting the minimal LSN, then choose the next min_lsn */ + if (lsn_to_delete != InvalidXLogRecPtr && + lsn_to_delete == state->min_lsn.value) + { + state->min_lsn.value = PG_UINT64_MAX; + for (i = 2; i <= state->backend_maxid; i++) + if (state->lsn[i] != InvalidXLogRecPtr && + state->lsn[i] < state->min_lsn.value) + state->min_lsn.value = state->lsn[i]; + } + + /* If deleting from the end of the array, shorten the array's used part */ + if (state->backend_maxid == MyBackendId) + for (i = (MyBackendId); i >= 2; i--) + if (state->lsn[i] != InvalidXLogRecPtr) + { + state->backend_maxid = i; + break; + } + + SpinLockRelease(&state->mutex); +} + +/* + * Report amount of shared memory space needed for WaitState + */ +Size +WaitShmemSize(void) +{ + Size size; + + size = offsetof(WaitState, lsn); + size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr))); + return size; +} + +/* + * Initialize an array of events to wait for in shared memory + */ +void +WaitShmemInit(void) +{ + bool found; + uint32 i; + + state = (WaitState *) ShmemInitStruct("pg_wait_lsn", + WaitShmemSize(), + &found); + if (!found) + { + SpinLockInit(&state->mutex); + + for (i = 0; i < (MaxBackends + 1); i++) + state->lsn[i] = InvalidXLogRecPtr; + + state->backend_maxid = 0; + state->min_lsn.value = PG_UINT64_MAX; + } +} + +/* + * Set latches in shared memory to signal that new LSN has been replayed + */ +void +WaitSetLatch(XLogRecPtr cur_lsn) +{ + uint32 i; + int backend_maxid; + PGPROC *backend; + + SpinLockAcquire(&state->mutex); + backend_maxid = state->backend_maxid; + + for (i = 2; i <= backend_maxid; i++) + { + backend = BackendIdGetProc(i); + + if (backend && state->lsn[i] != 0 && + state->lsn[i] <= cur_lsn) + { + SetLatch(&backend->procLatch); + } + } + SpinLockRelease(&state->mutex); +} + +/* + * Get minimal LSN that someone waits for + */ +XLogRecPtr +GetMinWaitedLSN(void) +{ + return state->min_lsn.value; +} + +/* + * On WAIT use a latch to wait till LSN is replayed, + * postmaster dies or timeout happens. Timeout is specified in milliseconds. + * Returns true if LSN was reached and false otherwise. + */ +bool +WaitUtility(XLogRecPtr target_lsn, const int timeout_ms) +{ + XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL); + int latch_events; + float8 endtime; + bool res = false; + bool wait_forever = (timeout_ms <= 0); + + /* + * In transactions, that have isolation level repeatable read or higher + * wait lsn creates a snapshot if called first in a block, which can + * lead the transaction to working incorrectly + */ + + if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) { + ereport(WARNING, + errmsg("Waitlsn may work incorrectly in this isolation level"), + errhint("Call wait lsn before starting the transaction")); + } + + endtime = GetNowFloat() + timeout_ms / 1000.0; + + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + /* Check if we already reached the needed LSN */ + if (cur_lsn >= target_lsn) + return true; + + AddWaitedLSN(target_lsn); + + for (;;) + { + int rc; + float8 time_left = 0; + long time_left_ms = 0; + + time_left = endtime - GetNowFloat(); + + /* Use 100 ms as the default timeout to check for interrupts */ + if (wait_forever || time_left < 0 || time_left > 0.1) + time_left_ms = 100; + else + time_left_ms = (long) ceil(time_left * 1000.0); + + /* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */ + CHECK_FOR_INTERRUPTS(); + + /* If postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + rc = WaitLatch(MyLatch, latch_events, time_left_ms, + WAIT_EVENT_CLIENT_READ); + + ResetLatch(MyLatch); + + if (rc & WL_LATCH_SET) + cur_lsn = GetXLogReplayRecPtr(NULL); + + if (rc & WL_TIMEOUT) + { + cur_lsn = GetXLogReplayRecPtr(NULL); + /* If the time specified by user has passed, stop waiting */ + time_left = endtime - GetNowFloat(); + if (!wait_forever && time_left <= 0.0) + break; + } + + /* If LSN has been replayed */ + if (target_lsn <= cur_lsn) + break; + } + + DeleteWaitedLSN(); + + if (cur_lsn < target_lsn) + ereport(WARNING, + errmsg("LSN was not reached"), + errhint("Try to increase wait time.")); + else + res = true; + + return res; +} + +Datum +pg_wait_lsn(PG_FUNCTION_ARGS) +{ + XLogRecPtr trg_lsn = PG_GETARG_LSN(0); + uint64_t delay = PG_GETARG_INT32(1); + + PG_RETURN_BOOL(WaitUtility(trg_lsn, delay)); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 8f1ded7338..760d760356 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,6 +25,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -142,6 +143,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, StatsShmemSize()); + size = add_size(size, WaitShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -295,6 +297,11 @@ CreateSharedMemoryAndSemaphores(void) AsyncShmemInit(); StatsShmemInit(); + /* + * Init array of events for the wait clause in shared memory + */ + WaitShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index dac921219f..92e354c848 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -704,6 +705,9 @@ LockErrorCleanup(void) AbortStrongLockAcquire(); + /* If wait lsn was interrupted, then stop waiting for that LSN */ + DeleteWaitedLSN(); + /* Nothing to do if we weren't waiting for a lock */ if (lockAwaited == NULL) { diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 5d78d6dc06..fbc96db198 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -384,8 +384,6 @@ pg_sleep(PG_FUNCTION_ARGS) * less than the specified time when WaitLatch is terminated early by a * non-query-canceling signal such as SIGHUP. */ -#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) - endtime = GetNowFloat() + secs; for (;;) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6996073989..ebff5cadcd 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12035,6 +12035,11 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +{ oid => '16387', descr => 'wait for LSN until timeout', + proname => 'pg_wait_lsn', prorettype => 'bool', proargtypes => 'pg_lsn int8', + proargnames => '{trg_lsn,delay}', + prosrc => 'pg_wait_lsn' }, + { oid => '6291', descr => 'arbitrary value from among input values', proname => 'any_value', prokind => 'a', proisstrict => 'f', prorettype => 'anyelement', proargtypes => 'anyelement', diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 0000000000..bd52664de9 --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 2023, Regents of PostgresPro + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H +#include "postgres.h" +#include "tcop/dest.h" +#include "nodes/parsenodes.h" + +extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms); +extern Size WaitShmemSize(void); +extern void WaitShmemInit(void); +extern void WaitSetLatch(XLogRecPtr cur_lsn); +extern XLogRecPtr GetMinWaitedLSN(void); +extern void DeleteWaitedLSN(void); + +#endif /* WAIT_H */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index c4dd96c8c9..6d763d1175 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -144,4 +144,6 @@ extern int date2isoyearday(int year, int mon, int mday); extern bool TimestampTimestampTzRequiresRewrite(void); +#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) + #endif /* TIMESTAMP_H */ diff --git a/src/test/recovery/t/034_waitlsn.pl b/src/test/recovery/t/034_waitlsn.pl new file mode 100644 index 0000000000..dc9e899671 --- /dev/null +++ b/src/test/recovery/t/034_waitlsn.pl @@ -0,0 +1,76 @@ +# Checks wait lsn +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Using the backup, create a streaming standby with a 1 second delay +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf('postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + +# Check that timeouts make us wait for the specified time (1s here) +my $current_time = $node_standby->safe_psql('postgres', "SELECT now()"); +my $two_seconds = 2000; # in milliseconds +my $start_time = time(); +$node_standby->safe_psql('postgres', + "SELECT pg_wait_lsn('0/FFFFFFFF', $two_seconds)"); +my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds +ok($time_waited >= $two_seconds, "wait lsn waits for enough time"); + +# Check that timeouts let us stop waiting right away, before reaching target LSN +# Wait for no wait +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); +my ($ret, $out, $err) = $node_standby->psql('postgres', + "SELECT pg_wait_lsn('$lsn1', 1)"); + +ok($ret == 0, "zero return value when failed to wait lsn on standby"); +ok($err =~ /WARNING: LSN was not reached/, + "correct error message when failed to wait lsn on standby"); +ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN"); + + +# Check that wait lsn works fine and reaches target LSN if given no timeout +# Wait for infinite + +# Add data on primary, memorize primary's last LSN +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()"); + +# Wait for it to appear on replica, memorize replica's last LSN +$node_standby->safe_psql('postgres', + "SELECT pg_wait_lsn('$lsn2', 0)"); +my $reached_lsn = $node_standby->safe_psql('postgres', + "SELECT pg_last_wal_replay_lsn()"); + +# Make sure that primary's and replica's LSNs are the same after WAIT +my $compare_lsns = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)"); +ok($compare_lsns eq 0, + "standby reached the same LSN as primary before starting transaction"); + +$node_standby->stop; +$node_primary->stop; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 260854747b..17e1d17660 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2992,6 +2992,7 @@ WaitEventIPC WaitEventSet WaitEventTimeout WaitPMResult +WaitState WalCloseMethod WalCompression WalLevel