On Sat, Mar 23, 2024 at 11:27 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > How about adding the test in 019_replslot_limit? It is not a direct > fit but I feel later we can even add 'invalid_timeout' related tests > in this file which will use last_inactive_time feature.
I'm thinking the other way. Now, the new TAP file 043_replslot_misc.pl can have last_inactive_time tests, and later invalid_timeout ones too. This way 019_replslot_limit.pl is not cluttered. > It is also > possible that some of the tests added by the 'invalid_timeout' feature > will obviate the need for some of these tests. Might be. But, I prefer to keep both these tests separate but in the same file 043_replslot_misc.pl. Because we cover some corner cases the last_inactive_time is set upon loading the slot from disk. > Review of v15 > ============== > 1. > @@ -1026,7 +1026,8 @@ CREATE VIEW pg_replication_slots AS > L.conflicting, > L.invalidation_reason, > L.failover, > - L.synced > + L.synced, > + L.last_inactive_time > FROM pg_get_replication_slots() AS L > > As mentioned previously, let's keep these new fields before > conflicting and after two_phase. Sorry, I forgot to notice that comment (out of a flood of comments really :)). Now, done that way. > 2. > +# Get last_inactive_time value after slot's creation. Note that the > slot is still > +# inactive unless it's used by the standby below. > +my $last_inactive_time_1 = $primary->safe_psql('postgres', > + qq(SELECT last_inactive_time FROM pg_replication_slots WHERE > slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;) > +); > > We should check $last_inactive_time_1 to be a valid value and add a > similar check for logical slots. That's taken care by the type cast we do, right? Isn't that enough? is( $primary->safe_psql( 'postgres', qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;] ), 't', 'last inactive time for an inactive physical slot is updated correctly'); For instance, setting last_inactive_time_1 to an invalid value fails with the following error: error running SQL: 'psql:<stdin>:1: ERROR: invalid input syntax for type timestamp with time zone: "foo" LINE 1: SELECT last_inactive_time > 'foo'::timestamptz FROM pg_repli... > 3. BTW, why don't we set last_inactive_time for temporary slots > (RS_TEMPORARY) as well? Don't we even invalidate temporary slots? If > so, then I think we should set last_inactive_time for those as well > and later allow them to be invalidated based on timeout parameter. WFM. Done that way. Please see the attached v16 patch. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From ce85a48bbbd9de5d6ca0ce849993707cc01d1211 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Sat, 23 Mar 2024 07:27:48 +0000 Subject: [PATCH v16] Track last_inactive_time in pg_replication_slots. Till now, the time at which the replication slot became inactive is not tracked directly in pg_replication_slots. This commit adds a new column 'last_inactive_time' for this. It is set to 0 whenever a slot is made active/acquired and set to current timestamp whenever the slot is inactive/released or restored from the disk. The new column will be useful on production servers to debug and analyze inactive replication slots. It will also help to know the lifetime of a replication slot - one can know how long a streaming standby, logical subscriber, or replication slot consumer is down. The new column will be useful to implement inactive timeout based replication slot invalidation in a future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com --- doc/src/sgml/system-views.sgml | 11 ++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/slot.c | 27 +++++ src/backend/replication/slotfuncs.c | 7 +- src/include/catalog/pg_proc.dat | 6 +- src/include/replication/slot.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/043_replslot_misc.pl | 127 +++++++++++++++++++++++ src/test/regress/expected/rules.out | 3 +- 9 files changed, 181 insertions(+), 5 deletions(-) create mode 100644 src/test/recovery/t/043_replslot_misc.pl diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index b5da476c20..2b36b5fef1 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2523,6 +2523,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>last_inactive_time</structfield> <type>timestamptz</type> + </para> + <para> + The time at which the slot became inactive. + <literal>NULL</literal> if the slot is currently actively being + used. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>conflicting</structfield> <type>bool</type> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f69b7f5580..bc70ff193e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, + L.last_inactive_time, L.conflicting, L.invalidation_reason, L.failover, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cdf0c450c5..0f48d6dc7c 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_inactive_time = 0; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -622,6 +623,11 @@ retry: if (SlotIsLogical(s)) pgstat_acquire_replslot(s); + /* The slot is active by now, so reset the last inactive time. */ + SpinLockAcquire(&s->mutex); + s->last_inactive_time = 0; + SpinLockRelease(&s->mutex); + if (am_walsender) { ereport(log_replication_commands ? LOG : DEBUG1, @@ -645,6 +651,7 @@ ReplicationSlotRelease(void) ReplicationSlot *slot = MyReplicationSlot; char *slotname = NULL; /* keep compiler quiet */ bool is_logical = false; /* keep compiler quiet */ + TimestampTz now; Assert(slot != NULL && slot->active_pid != 0); @@ -679,6 +686,12 @@ ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + /* + * Set the last inactive time after marking slot inactive. We get current + * time beforehand to avoid system call while holding the lock. + */ + now = GetCurrentTimestamp(); + if (slot->data.persistency == RS_PERSISTENT) { /* @@ -687,9 +700,16 @@ ReplicationSlotRelease(void) */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + slot->last_inactive_time = now; SpinLockRelease(&slot->mutex); ConditionVariableBroadcast(&slot->active_cv); } + else + { + SpinLockAcquire(&slot->mutex); + slot->last_inactive_time = now; + SpinLockRelease(&slot->mutex); + } MyReplicationSlot = NULL; @@ -2342,6 +2362,13 @@ RestoreSlotFromDisk(const char *name) slot->in_use = true; slot->active_pid = 0; + /* + * We set last inactive time after loading the slot from the disk into + * memory. Whoever acquires the slot i.e. makes the slot active will + * anyway reset it. + */ + slot->last_inactive_time = GetCurrentTimestamp(); + restored = true; break; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 4232c1e52e..24f5e6d90a 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 18 +#define PG_GET_REPLICATION_SLOTS_COLS 19 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.last_inactive_time > 0) + values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time); + else + nulls[i++] = true; + cause = slot_contents.data.invalidated; if (SlotIsPhysical(&slot_contents)) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 71c74350a0..0d26e5b422 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11133,9 +11133,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7f25a083ee..2f18433ecc 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -201,6 +201,9 @@ typedef struct ReplicationSlot * forcibly flushed or not. */ XLogRecPtr last_saved_confirmed_flush; + + /* The time at which this slot become inactive */ + TimestampTz last_inactive_time; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index b1eb77b1ec..c8259f99d5 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -51,6 +51,7 @@ tests += { 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', + 't/043_replslot_misc.pl', ], }, } diff --git a/src/test/recovery/t/043_replslot_misc.pl b/src/test/recovery/t/043_replslot_misc.pl new file mode 100644 index 0000000000..86e58691bf --- /dev/null +++ b/src/test/recovery/t/043_replslot_misc.pl @@ -0,0 +1,127 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Replication slot related miscellaneous tests +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# ============================================================================= +# Testcase start: Check last_inactive_time property of streaming standby's slot +# + +# Initialize primary node +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->start; + +# Take backup +my $backup_name = 'my_backup'; +$primary->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($primary, $backup_name, has_streaming => 1); + +my $sb_slot = 'sb_slot'; +$standby->append_conf('postgresql.conf', "primary_slot_name = '$sb_slot'"); + +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := '$sb_slot'); +]); + +# Get last_inactive_time value after slot's creation. Note that the slot is still +# inactive unless it's used by the standby below. +my $last_inactive_time = $primary->safe_psql('postgres', + qq(SELECT last_inactive_time FROM pg_replication_slots WHERE slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;) +); + +$standby->start; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby); + +# Now the slot is active so last_inactive_time value must be NULL +is( $primary->safe_psql( + 'postgres', + qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb_slot';] + ), + 't', + 'last inactive time for an active physical slot is NULL'); + +# Stop the standby to check its last_inactive_time value is updated +$standby->stop; + +# Let's also restart the primary so that the last_inactive_time is set upon +# loading the slot from disk. +$primary->restart; + +is( $primary->safe_psql( + 'postgres', + qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb_slot' AND last_inactive_time IS NOT NULL;] + ), + 't', + 'last inactive time for an inactive physical slot is updated correctly'); + +# Testcase end: Check last_inactive_time property of streaming standby's slot +# ============================================================================= + +# ============================================================================= +# Testcase start: Check last_inactive_time property of logical subscriber's slot +my $publisher = $primary; + +# Create subscriber node +my $subscriber = PostgreSQL::Test::Cluster->new('sub'); +$subscriber->init; + +# Setup logical replication +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); + +my $lsub_slot = 'lsub_slot'; +$publisher->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot(slot_name := '$lsub_slot', plugin := 'pgoutput');" +); + +# Get last_inactive_time value after slot's creation. Note that the slot is still +# inactive unless it's used by the subscriber below. +$last_inactive_time = $primary->safe_psql('postgres', + qq(SELECT last_inactive_time FROM pg_replication_slots WHERE slot_name = '$lsub_slot' AND last_inactive_time IS NOT NULL;) +); + +$subscriber->start; +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = '$lsub_slot', create_slot = false)" +); + +# Wait until subscriber has caught up +$subscriber->wait_for_subscription_sync($publisher, 'sub'); + +# Now the slot is active so last_inactive_time value must be NULL +is( $publisher->safe_psql( + 'postgres', + qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub_slot';] + ), + 't', + 'last inactive time for an active logical slot is NULL'); + +# Stop the subscriber to check its last_inactive_time value is updated +$subscriber->stop; + +# Let's also restart the publisher so that the last_inactive_time is set upon +# loading the slot from disk. +$publisher->restart; + +is( $publisher->safe_psql( + 'postgres', + qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub_slot' AND last_inactive_time IS NOT NULL;] + ), + 't', + 'last inactive time for an inactive logical slot is updated correctly'); + +# Testcase end: Check last_inactive_time property of logical subscriber's slot +# ============================================================================= + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 18829ea586..dfcbaec387 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,11 +1473,12 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, + l.last_inactive_time, l.conflicting, l.invalidation_reason, l.failover, l.synced - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1