Hello. At Wed, 18 Nov 2020 15:05:00 +0300, a.pervush...@postgrespro.ru wrote in > I've changed the BEGIN WAIT FOR LSN statement to core functions > pg_waitlsn, pg_waitlsn_infinite and pg_waitlsn_no_wait. > Currently the functions work inside repeatable read transactions, but > waitlsn creates a snapshot if called first in a transaction block, > which can possibly lead the transaction to working incorrectly, so the > function gives a warning.
According to the discuttion here, implementing as functions is not optimal. As a Poc, I made it as a procedure. However I'm not sure it is the correct implement as a native procedure but it seems working as expected. > Usage examples > ========== > select pg_waitlsn(‘LSN’, timeout); > select pg_waitlsn_infinite(‘LSN’); > select pg_waitlsn_no_wait(‘LSN’); The first and second usage is coverd by a single procedure. The last function is equivalent to pg_last_wal_replay_lsn(). As the result, the following procedure is provided in the attached. pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1) Any opinions mainly compared to implementation as a command? regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 470e113b33..4283b98eb4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -42,6 +42,7 @@ #include "catalog/pg_database.h" #include "commands/progress.h" #include "commands/tablespace.h" +#include "commands/wait.h" #include "common/controldata_utils.h" #include "executor/instrument.h" #include "miscadmin.h" @@ -7463,6 +7464,15 @@ StartupXLOG(void) break; } + /* + * If we replayed an LSN that someone was waiting for, + * set latches in shared memory array to notify the waiter. + */ + if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN()) + { + WaitSetLatch(XLogCtl->lastReplayedEndRecPtr); + } + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fa58afd9d7..c19d49e7a4 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1460,6 +1460,10 @@ LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE AS 'unicode_is_normalized'; +CREATE OR REPLACE PROCEDURE + pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1) + LANGUAGE internal AS 'pg_waitlsn'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index e8504f0ae4..2c0bd41336 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -60,6 +60,7 @@ OBJS = \ user.o \ vacuum.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index f9bbe97b50..959e96b7e0 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -23,6 +23,7 @@ #include "access/syncscan.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, WaitShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -268,6 +270,11 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); + /* + * Init array of events for the wait clause in shared memory + */ + WaitShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index c87ffc6549..2b4d73ba2f 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -38,6 +38,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" +#include "commands/wait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -713,6 +714,9 @@ LockErrorCleanup(void) AbortStrongLockAcquire(); + /* If waitlsn was interrupted, then stop waiting for that LSN */ + DeleteWaitedLSN(); + /* Nothing to do if we weren't waiting for a lock */ if (lockAwaited == NULL) { diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 4096faff9a..90876da120 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -373,8 +373,6 @@ pg_sleep(PG_FUNCTION_ARGS) * less than the specified time when WaitLatch is terminated early by a * non-query-canceling signal such as SIGHUP. */ -#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) - endtime = GetNowFloat() + secs; for (;;) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b5f52d4e4a..918eaedfd5 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11375,4 +11375,8 @@ proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text', prosrc => 'unicode_is_normalized' }, +{ oid => '9313', descr => 'wait for LSN to be replayed', + proname => 'pg_waitlsn', prokind => 'p',prorettype => 'void', proargtypes => 'pg_lsn int4', + proargnames => '{wait_lsn,timeout}', + prosrc => 'pg_waitlsn' } ] diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 63bf71ac61..6c4ecd704d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -113,4 +113,6 @@ extern int date2isoyearday(int year, int mon, int mday); extern bool TimestampTimestampTzRequiresRewrite(void); +#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0) + #endif /* TIMESTAMP_H */