Hi, On 2021-04-06 14:30:29 +0200, Drouvot, Bertrand wrote: > From 827295f74aff9c627ee722f541a6c7cc6d4133cf Mon Sep 17 00:00:00 2001 > From: bdrouvotAWS <bdrou...@amazon.com> > Date: Tue, 6 Apr 2021 11:59:23 +0000 > Subject: [PATCH v15 1/5] Allow logical decoding on standby. > > Allow a logical slot to be created on standby. Restrict its usage > or its creation if wal_level on primary is less than logical. > During slot creation, it's restart_lsn is set to the last replayed > LSN. Effectively, a logical slot creation on standby waits for an > xl_running_xact record to arrive from primary. Conflicting slots > would be handled in next commits. > > Andres Freund and Amit Khandekar.
I think more people have worked on this by now... Does this strike you as an accurate description? Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas > --- a/src/backend/replication/logical/logical.c > +++ b/src/backend/replication/logical/logical.c > @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void) > > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("logical decoding requires a database > connection"))); > > - /* ---- > - * TODO: We got to change that someday soon... > - * > - * There's basically three things missing to allow this: > - * 1) We need to be able to correctly and quickly identify the timeline > a > - * LSN belongs to > - * 2) We need to force hot_standby_feedback to be enabled at all times > so > - * the primary cannot remove rows we need. > - * 3) support dropping replication slots referring to a database, in > - * dbase_redo. There can't be any active ones due to HS recovery > - * conflicts, so that should be relatively easy. > - * ---- > - */ > if (RecoveryInProgress()) > - ereport(ERROR, > - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), > - errmsg("logical decoding cannot be used while > in recovery"))); Maybe I am just missing something right now, and maybe I'm being a bit overly pedantic, but I don't immediately see how 0001 is correct without 0002 and 0003? I think it'd be better to first introduce the conflict information, then check for conflicts, and only after that allow decoding on standbys? > diff --git a/src/backend/access/transam/xlog.c > b/src/backend/access/transam/xlog.c > index 6f8810e149..6a21cba362 100644 > --- a/src/backend/access/transam/xlog.c > +++ b/src/backend/access/transam/xlog.c > @@ -5080,6 +5080,17 @@ LocalProcessControlFile(bool reset) > ReadControlFile(); > } > > +/* > + * Get the wal_level from the control file. For a standby, this value should > be > + * considered as its active wal_level, because it may be different from what > + * was originally configured on standby. > + */ > +WalLevel > +GetActiveWalLevel(void) > +{ > + return ControlFile->wal_level; > +} > + This strikes me as error-prone - there's nothing in the function name that this should mainly (only?) be used during recovery... > + if (SlotIsPhysical(slot)) > + restart_lsn = GetRedoRecPtr(); > + else if (RecoveryInProgress()) > + { > + restart_lsn = GetXLogReplayRecPtr(NULL); > + /* > + * Replay pointer may point one past the end of the > record. If that > + * is a XLOG page boundary, it will not be a valid LSN > for the > + * start of a record, so bump it up past the page > header. > + */ > + if (!XRecOffIsValid(restart_lsn)) > + { > + if (restart_lsn % XLOG_BLCKSZ != 0) > + elog(ERROR, "invalid replay pointer"); > + > + /* For the first page of a segment file, it's a > long header */ > + if (XLogSegmentOffset(restart_lsn, > wal_segment_size) == 0) > + restart_lsn += SizeOfXLogLongPHD; > + else > + restart_lsn += SizeOfXLogShortPHD; > + } > + } This seems like a layering violation to me. I don't think stuff like this should be outside of xlog[reader].c, and definitely not in ReplicationSlotReserveWal(). Relevant discussion (which totally escaped my mind): https://postgr.es/m/CAJ3gD9csOr0LoYoMK9NnfBk0RZmvHXcJAFWFd2EuL%3DNOfz7PVA%40mail.gmail.com > + else > + restart_lsn = GetXLogInsertRecPtr(); > + > + SpinLockAcquire(&slot->mutex); > + slot->data.restart_lsn = restart_lsn; > + SpinLockRelease(&slot->mutex); > + > if (!RecoveryInProgress() && SlotIsLogical(slot)) > { > XLogRecPtr flushptr; > > - /* start at current insert position */ > - restart_lsn = GetXLogInsertRecPtr(); > - SpinLockAcquire(&slot->mutex); > - slot->data.restart_lsn = restart_lsn; > - SpinLockRelease(&slot->mutex); > - > /* make sure we have enough information to start */ > flushptr = LogStandbySnapshot(); > > /* and make sure it's fsynced to disk */ > XLogFlush(flushptr); > } > - else > - { > - restart_lsn = GetRedoRecPtr(); > - SpinLockAcquire(&slot->mutex); > - slot->data.restart_lsn = restart_lsn; > - SpinLockRelease(&slot->mutex); > - } > > /* prevent WAL removal as fast as possible */ > ReplicationSlotsComputeRequiredLSN(); I think I'd move the LogStandbySnapshot() piece out of the entire loop. There's no reason for logging multiple ones if we then just end up failing because of the XLogGetLastRemovedSegno() check. > diff --git a/src/include/access/heapam_xlog.h > b/src/include/access/heapam_xlog.h > index 178d49710a..6c4c26c2fe 100644 > --- a/src/include/access/heapam_xlog.h > +++ b/src/include/access/heapam_xlog.h > @@ -239,6 +239,7 @@ typedef struct xl_heap_update > */ > typedef struct xl_heap_clean > { > + bool onCatalogTable; > TransactionId latestRemovedXid; > uint16 nredirected; > uint16 ndead; > @@ -254,6 +255,7 @@ typedef struct xl_heap_clean > */ > typedef struct xl_heap_cleanup_info > { > + bool onCatalogTable; > RelFileNode node; > TransactionId latestRemovedXid; > } xl_heap_cleanup_info; > @@ -334,6 +336,7 @@ typedef struct xl_heap_freeze_tuple > */ > typedef struct xl_heap_freeze_page > { > + bool onCatalogTable; > TransactionId cutoff_xid; > uint16 ntuples; > } xl_heap_freeze_page; > @@ -348,6 +351,7 @@ typedef struct xl_heap_freeze_page > */ > typedef struct xl_heap_visible > { > + bool onCatalogTable; > TransactionId cutoff_xid; > uint8 flags; > } xl_heap_visible; Reminder to self: This needs a WAL version bump. > diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h > index 9a3a03e520..3405070d63 100644 > --- a/src/include/utils/rel.h > +++ b/src/include/utils/rel.h > @@ -16,6 +16,7 @@ > > #include "access/tupdesc.h" > #include "access/xlog.h" > +#include "catalog/catalog.h" > #include "catalog/pg_class.h" > #include "catalog/pg_index.h" > #include "catalog/pg_publication.h" Not clear why this is in this patch? > diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c > index 5ba776e789..03c5dbea48 100644 > --- a/src/backend/postmaster/pgstat.c > +++ b/src/backend/postmaster/pgstat.c > @@ -2928,6 +2928,24 @@ pgstat_send_archiver(const char *xlog, bool failed) > pgstat_send(&msg, sizeof(msg)); > } > > +/* ---------- > + * pgstat_send_droplogicalslot() - > + * > + * Tell the collector about a logical slot being dropped > + * due to conflict. > + * ---------- > + */ > +void > +pgstat_send_droplogicalslot(Oid dbOid) > +{ > + PgStat_MsgRecoveryConflict msg; > + > + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT); > + msg.m_databaseid = dbOid; > + msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT; > + pgstat_send(&msg, sizeof(msg)); > +} Why do we have this in adition to pgstat_report_replslot_drop()? ISTM that we should instead add a reason parameter to pgstat_report_replslot_drop()? > +/* > + * Resolve recovery conflicts with logical slots. > + * > + * When xid is valid, it means that rows older than xid might have been > + * removed. I don't think the past tense is correct - the rows better not be removed yet on the standby, otherwise we'd potentially do something random in decoding. > diff --git a/src/test/recovery/t/024_standby_logical_decoding_xmins.pl > b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl > new file mode 100644 > index 0000000000..d654d79526 > --- /dev/null > +++ b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl > @@ -0,0 +1,272 @@ > +# logical decoding on a standby : ensure xmins are appropriately updated > + > +use strict; > +use warnings; > + > +use PostgresNode; > +use TestLib; > +use Test::More tests => 23; > +use RecursiveCopy; > +use File::Copy; > +use Time::HiRes qw(usleep); Several of these don't actually seem to be used? > +######################## > +# Initialize master node > +######################## (I'll rename these to primary/replica) > +$node_master->init(allows_streaming => 1, has_archiving => 1); > +$node_master->append_conf('postgresql.conf', q{ > +wal_level = 'logical' > +max_replication_slots = 4 > +max_wal_senders = 4 > +log_min_messages = 'debug2' > +log_error_verbosity = verbose > +# very promptly terminate conflicting backends > +max_standby_streaming_delay = '2s' > +}); Why is this done on the primary, rather than on the standby? > +################################ > +# Catalog xmins should advance after standby logical slot fetches the > changes. > +################################ > + > +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses > the slot, > +# we hold down xmin. I don't know what that means. > +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]); > +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial > primary key, blah text)'); > +for my $i (0 .. 2000) > +{ > + $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) > VALUES ('entry $i')]); > +} Forking 2000 psql processes is pretty expensive, especially on slower machines. What is this supposed to test? > +($ret, $stdout, $stderr) = $node_standby->psql('postgres', > + qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', > NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', > 'include-timestamp', '0')]); > +is($ret, 0, 'replay of big series succeeded'); > +isnt($stdout, '', 'replayed some rows'); Nothing is being replayed... > +###################### > +# Upstream oldestXid should not go past downstream catalog_xmin > +###################### > + > +# First burn some xids on the master in another DB, so we push the master's > +# nextXid ahead. > +foreach my $i (1 .. 100) > +{ > + $node_master->safe_psql('postgres', 'SELECT txid_current()'); > +} > + > +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance > +# past our needed xmin. The only way we have visibility into that is to force > +# a checkpoint. > +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = > true WHERE datname = 'template0'"); > +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0') > +{ > + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); > +} > +$node_master->safe_psql('postgres', 'CHECKPOINT'); > +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) > + or die "pg_controldata failed with $?"; > +my @checkpoint = split('\n', $stdout); > +my $oldestXid = ''; > +foreach my $line (@checkpoint) > +{ > + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) > + { > + $oldestXid = $1; > + } > +} > +die 'no oldestXID found in checkpoint' unless $oldestXid; > + > +cmp_ok($oldestXid, "<=", > $node_standby->slot($standby_slotname)->{'catalog_xmin'}, > + 'upstream oldestXid not past downstream catalog_xmin with > hs_feedback on'); > + > +$node_master->safe_psql('postgres', > + "UPDATE pg_database SET datallowconn = false WHERE datname = > 'template0'"); > + I am thinking of removing this test. It doesn't seem to test anything really related to the issue at hand, and seems complicated (needing to update datallowcon, manually triggering checkpoints, parsing pg_controldata output). > +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for > +# given boolean condition to be true to ensure we've reached a quiescent > state > +sub wait_for_xmins > +{ > + my ($node, $slotname, $check_expr) = @_; > + > + $node->poll_query_until( > + 'postgres', qq[ > + SELECT $check_expr > + FROM pg_catalog.pg_replication_slots > + WHERE slot_name = '$slotname'; > + ]) or die "Timed out waiting for slot xmins to advance"; > +} > + > +# Verify that pg_stat_database_conflicts.confl_logicalslot has been updated > +sub check_confl_logicalslot > +{ > + ok( $node_standby->poll_query_until( > + 'postgres', > + "select (confl_logicalslot = 2) from pg_stat_database_conflicts > where datname = 'testdb'", 't'), > + 'confl_logicalslot updated') or die "Timed out waiting > confl_logicalslot to be updated"; > +} > + Given that this hardcodes a specific number of conflicting slots etc, there doesn't seem much point in making this a function... > +# Acquire one of the standby logical slots created by create_logical_slots() > +sub make_slot_active > +{ > + my $slot_user_handle; > + > + # make sure activeslot is in use > + print "starting pg_recvlogical\n"; > + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', > $node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', > '--start'], '>', \$stdout, '2>', \$stderr); > + > + while (!$node_standby->slot('activeslot')->{'active_pid'}) > + { > + usleep(100_000); > + print "waiting for slot to become active\n"; > + } > + return $slot_user_handle; > +} It's a bad idea to not have timeouts in things like this - if there's a problem, it'll lead to the test never returning. Things like poll_query_until() have timeouts to deal with this, but this doesn't. > +# Check if all the slots on standby are dropped. These include the > 'activeslot' > +# that was acquired by make_slot_active(), and the non-active 'dropslot'. > +sub check_slots_dropped > +{ > + my ($slot_user_handle) = @_; > + my $return; > + > + is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on > standby dropped'); > + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on > standby dropped'); > + > + # our client should've terminated in response to the walsender error > + eval { > + $slot_user_handle->finish; > + }; > + $return = $?; > + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n"); > + if ($return) { > + like($stderr, qr/conflict with recovery/, 'recvlogical recovery > conflict'); > + like($stderr, qr/must be dropped/, 'recvlogical error detail'); > + } Why do we need to use eval{} for things like checking if a program finished? > @@ -297,6 +297,24 @@ postgres=# select * from > pg_logical_slot_get_changes('regression_slot', NULL, NU > may consume changes from a slot at any given time. > </para> > > + <para> > + A logical replication slot can also be created on a hot standby. To > prevent > + <command>VACUUM</command> from removing required rows from the system > + catalogs, <varname>hot_standby_feedback</varname> should be set on the > + standby. In spite of that, if any required rows get removed, the slot > gets > + dropped. Existing logical slots on standby also get dropped if wal_level > + on primary is reduced to less than 'logical'. > + </para> I think this should add that it's very advisable to use a physical slot between primary and standby. Otherwise hot_standby_feedback will work, but only while the connection is alive - as soon as it breaks, a node gets restarted, ... Greetings, Andres Freund