Rebase done.
Meanwhile I made some more changes.
Changes
=======
1) WAITLSN is now implemented as an extension called "pg_waitlsn"
2) Call new hook "lsn_updated_hook" right after xact_redo_commit (xlog.c)
3) Corresponding functions:
pg_waitlsn('0/693FF800', 10000) - wait 10 seconds
pg_waitlsn_infinite('0/693FF800') - for infinite wait
pg_waitlsn_no_wait('0/693FF800') - once check if LSN was replayed or not.
4) Add two GUCs which help tuning influence on StartupXLOG:
count_waitlsn (denominator to check not each LSN)
int count_waitlsn = 10;
interval_waitlsn (Interval in milliseconds to additional LSN check)
int interval_waitlsn = 100;
5) Optimize loop that set latches.
How to use it
==========
Master:
1) Make "wal_level = replica"
Slave:
2) Add shared_preload_libraries = 'pg_waitlsn'
hot_standby = on (in postgresql.conf)
3) Create extension pg_waitlsn;
4) And in hot_standby you can wait for LSN (pgsleep), when LSN will
replayed on slave pg_waitlsn will release
select pg_waitlsn(‘LSN’ [, timeout in ms]);
select pg_waitlsn_infinite(‘LSN’);
select pg_waitlsn_no_wait(‘LSN’);
#Wait until LSN 0/303EC60 will be replayed, or 10 second passed.
select pg_waitlsn(‘0/303EC60’, 10000);
#Or same without timeout.
select pg_waitlsn(‘0/303EC60’);
select pg_waitlsn_infinite('0/693FF800');
#To check if LSN is replayed can be used.
select pg_waitlsn_no_wait('0/693FF800');
Notice: select pg_waitlsn will release on PostmasterDeath or
Interruption events if they come earlier then target LSN or timeout.
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/contrib/pg_waitlsn/Makefile b/contrib/pg_waitlsn/Makefile
new file mode 100644
index 0000000..49a326c
--- /dev/null
+++ b/contrib/pg_waitlsn/Makefile
@@ -0,0 +1,21 @@
+# pg_waitlsn/Makefile
+
+MODULE_big = pg_waitlsn
+OBJS = pg_waitlsn.o
+EXTENSION = pg_waitlsn
+DATA = pg_waitlsn--1.0.sql
+
+
+
+ifdef USE_PGXS
+
+ PG_CONFIG = pg_config
+ PGXS := $( shell $( PG_CONFIG ) --pgxs )
+ include $(PGXS)
+else
+
+ subdir = contrib/pg_waitlsn
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_waitlsn/pg_waitlsn--1.0.sql b/contrib/pg_waitlsn/pg_waitlsn--1.0.sql
new file mode 100644
index 0000000..8b251f3
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn--1.0.sql
@@ -0,0 +1,19 @@
+/* contrib/pg_waitlsn/pg_waitlsn--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_waitlsn" to wait target LSN to been replayed, delay for waiting in miliseconds (default infinity) \quit
+
+CREATE FUNCTION pg_waitlsn(lsn pg_lsn, delay int default 0)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn'
+LANGUAGE C IMMUTABLE STRICT ;
+
+CREATE FUNCTION pg_waitlsn_infinite(lsn pg_lsn)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn_infinite'
+LANGUAGE C IMMUTABLE STRICT ;
+
+CREATE FUNCTION pg_waitlsn_no_wait(lsn pg_lsn)
+RETURNS bool
+AS 'MODULE_PATHNAME', 'pg_waitlsn_no_wait'
+LANGUAGE C IMMUTABLE STRICT ;
diff --git a/contrib/pg_waitlsn/pg_waitlsn.c b/contrib/pg_waitlsn/pg_waitlsn.c
new file mode 100644
index 0000000..d210678
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn.c
@@ -0,0 +1,299 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_waitlsn
+ *
+ * Portions Copyright (c) 2012-2017, PostgresPro Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/pg_waitlsn/pg_waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "pgstat.h"
+#include "access/xlog.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "access/transam.h"
+#include "utils/guc.h"
+
+PG_MODULE_MAGIC;
+
+static bool pg_waitlsn_internal(XLogRecPtr lsn, uint64_t delay);
+
+/* Hooks values */
+static lsn_updated_hook_type prev_lsn_updated_hook = NULL;
+static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
+static void wl_lsn_updated_hook(void);
+static uint32 estimate_shmem_size(void);
+
+/* Latches Own-DisownLatch and AbortCаllBack */
+static void disown_latches_on_abort(XactEvent event, void *arg);
+static void wl_own_latch(void);
+static void wl_disown_latch(void);
+
+/* GUC variable */
+int count_waitlsn = 10;
+int interval_waitlsn = 100;
+
+/* Globals */
+TimestampTz time_waitlsn = 0;
+int counter_waitlsn = 0;
+
+void _PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ int pid;
+ volatile slock_t slock;
+ Latch latch;
+} BIDLatch;
+
+typedef struct
+{
+ char dummy;
+ int backend_maxid;
+ BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+bool is_latch_owned = false;
+
+static uint32
+estimate_shmem_size(void)
+{
+ return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+static void
+wl_own_latch(void)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ OwnLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = true;
+
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->l_arr[MyBackendId].pid = MyProcPid;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+static void
+wl_disown_latch(void)
+{
+ int i;
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ DisownLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = false;
+ state->l_arr[MyBackendId].pid = 0;
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MaxConnections+1); i >=0; i--)
+ if (state->l_arr[i].pid != 0)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function */
+static void
+disown_latches_on_abort(XactEvent event, void *arg)
+{
+ if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT))
+ {
+ wl_disown_latch();
+ }
+}
+
+/*
+ * Distribute shared memor, initlocks and latches.
+ */
+static void
+qs_shmem_startup(void)
+{
+ bool found;
+ uint i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ estimate_shmem_size(),
+ &found);
+ if (!found)
+ {
+ for (i = 0; i < (MaxConnections+1); i++)
+ {
+ state->l_arr[i].pid = 0;
+ SpinLockInit(&state->l_arr[i].slock);
+ InitSharedLatch(&state->l_arr[i].latch);
+ }
+ }
+ if (prev_shmem_startup_hook)
+ prev_shmem_startup_hook();
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+ if (!process_shared_preload_libraries_in_progress)
+ return;
+
+ time_waitlsn = GetCurrentTimestamp();
+
+ RequestAddinShmemSpace(sizeof(GlobState));
+
+ /* Define interval_waitlsn */
+ DefineCustomIntVariable(
+ "interval_waitlsn",
+
+ "Set interval of time (ms) how often LSN will be checked.",
+
+ "Set interval of time (ms) how often LSN will be checked to "
+ "make less influence on StartupXLOG() process.",
+ &interval_waitlsn,
+ 100, 0, INT_MAX,
+ PGC_SUSET,
+ GUC_UNIT_MS,
+ NULL, NULL, NULL);
+
+ /* Define count_waitlsn */
+ DefineCustomIntVariable(
+ "count_waitlsn",
+
+ "How often LSN will be checked.",
+
+ "Set count of LSNs that will be passed befor LSN check to "
+ "make less influence on StartupXLOG() process.",
+ &count_waitlsn,
+ 10, 1, INT_MAX,
+ PGC_SUSET,
+ GUC_NOT_IN_SAMPLE,
+ NULL, NULL, NULL);
+
+ prev_lsn_updated_hook = lsn_updated_hook;
+ lsn_updated_hook = wl_lsn_updated_hook;
+
+ prev_shmem_startup_hook = shmem_startup_hook;
+ shmem_startup_hook = qs_shmem_startup;
+
+ if (!IsUnderPostmaster)
+ RegisterXactCallback(disown_latches_on_abort, NULL);
+}
+
+/* Hook function */
+static void
+wl_lsn_updated_hook(void)
+{
+ uint i;
+ /*
+ * After update lastReplayedEndRecPtr set Latches in SHMEM array
+ */
+ if (counter_waitlsn % count_waitlsn == 0
+ || TimestampDifferenceExceeds(time_waitlsn,GetCurrentTimestamp(),interval_waitlsn))
+ {
+ for (i = 0; i <= state->backend_maxid; i++)
+ {
+ SpinLockAcquire(&state->l_arr[i].slock);
+ if (state->l_arr[i].pid != 0)
+ SetLatch(&state->l_arr[i].latch);
+ SpinLockRelease(&state->l_arr[i].slock);
+ }
+ elog(DEBUG2,"WAITLSN - %d / %s", counter_waitlsn, timestamptz_to_str(GetCurrentTimestamp()));
+ time_waitlsn = GetCurrentTimestamp();
+ }
+ counter_waitlsn++;
+}
+
+PG_FUNCTION_INFO_V1( pg_waitlsn );
+PG_FUNCTION_INFO_V1( pg_waitlsn_infinite );
+PG_FUNCTION_INFO_V1( pg_waitlsn_no_wait );
+
+
+Datum
+pg_waitlsn(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+ uint64_t delay = PG_GETARG_INT32(1);
+
+ PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, delay));
+}
+
+Datum
+pg_waitlsn_infinite(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+
+ PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, 0));
+}
+
+Datum
+pg_waitlsn_no_wait(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr trg_lsn = PG_GETARG_LSN(0);
+
+ PG_RETURN_BOOL(pg_waitlsn_internal(trg_lsn, 1));
+}
+
+static bool
+pg_waitlsn_internal(XLogRecPtr trg_lsn, uint64_t delay)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ uint64_t tdelay = delay;
+ long secs;
+ int microsecs;
+ TimestampTz timer = GetCurrentTimestamp();
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ wl_own_latch();
+ for (;;)
+ {
+ ResetLatch(&state->l_arr[MyBackendId].latch);
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN had been Replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If the postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If Delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+ break;
+ TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs);
+ tdelay = delay - (secs*1000 + microsecs/1000);
+ }
+
+ elog(DEBUG2,"WAITLSN %x", MyPgXact->xmin);
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+ CHECK_FOR_INTERRUPTS();
+ }
+ wl_disown_latch();
+
+ return trg_lsn <= GetXLogReplayRecPtr(NULL);
+}
diff --git a/contrib/pg_waitlsn/pg_waitlsn.control b/contrib/pg_waitlsn/pg_waitlsn.control
new file mode 100644
index 0000000..7be85d6
--- /dev/null
+++ b/contrib/pg_waitlsn/pg_waitlsn.control
@@ -0,0 +1,5 @@
+# pg_waitlsn extension
+comment = 'target LSN waiter for slave replica'
+default_version = '1.0'
+module_pathname = '$libdir/pg_waitlsn'
+relocatable = true
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2dcff7f..c6018e5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -832,6 +832,9 @@ static bool holdingAllLocks = false;
static MemoryContext walDebugCxt = NULL;
#endif
+/* Hook after xlogreader replayed lsn */
+lsn_updated_hook_type lsn_updated_hook = NULL;
+
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
static bool recoveryStopsBefore(XLogReaderState *record);
@@ -7174,6 +7177,12 @@ StartupXLOG(void)
break;
}
+ /*
+ * Hook after update lastReplayedEndRecPtr
+ */
+ if (lsn_updated_hook != NULL)
+ lsn_updated_hook();
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9f036c7..175023c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -287,6 +287,12 @@ extern void assign_max_wal_size(int newval, void *extra);
extern void assign_checkpoint_completion_target(double newval, void *extra);
/*
+ * Hook after xlogreader replayed lsn
+ */
+typedef void (*lsn_updated_hook_type) (void);
+extern PGDLLIMPORT lsn_updated_hook_type lsn_updated_hook;
+
+/*
* Starting/stopping a base backup
*/
extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers