On Mon, 28 Aug 2023 at 18:56, Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Thu, Aug 24, 2023 at 11:44 AM vignesh C <vignes...@gmail.com> wrote: > > > > The patch looks mostly good to me. I have made minor changes which are > as follows: (a) removed the autovacuum =off and > wal_receiver_status_interval = 0 setting as those doesn't seem to be > required for the test; (b) changed a few comments and variable names > in the code and test; > > Shall we change the test file name from always_persist to > save_logical_slots_shutdown and move to recovery/t/ as this test is > about verification after the restart of the server?
That makes sense. The attached v6 version has the changes for the same, apart from this I have also fixed a) pgindent issues b) perltidy issues c) one variable change (flush_lsn_changed to confirmed_flush_has_changed) d) corrected few comments in the test file. Thanks to Peter for providing few offline comments. Regards, Vignesh
From 7cb9d7c874397cacbb4fdd6c411f1d93570265f8 Mon Sep 17 00:00:00 2001 From: Julien Rouhaud <julien.rouh...@free.fr> Date: Fri, 14 Apr 2023 13:49:09 +0800 Subject: [PATCH v6] Persist logical slots to disk during a shutdown checkpoint if required. It's entirely possible for a logical slot to have a confirmed_flush LSN higher than the last value saved on disk while not being marked as dirty. Currently, it is not a major problem but a later patch adding support for the upgrade of slots relies on that value being properly persisted to disk. It can also help with avoiding processing the same transactions again in some boundary cases after the clean shutdown and restart. Say, we process some transactions for which we didn't send anything downstream (the changes got filtered) but the confirm_flush LSN is updated due to keepalives. As we don't flush the latest value of confirm_flush LSN, it may lead to processing the same changes again. Author: Julien Rouhaud, Vignesh C, Kuroda Hayato based on suggestions by Ashutosh Bapat Reviewed-by: Amit Kapila, Peter Smith Discussion: http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=gyqkxox0afnbm1fbp7sy7t4yw...@mail.gmail.com Discussion: http://postgr.es/m/tyapr01mb58664c81887b3af2eb6b16e3f5...@tyapr01mb5866.jpnprd01.prod.outlook.com --- src/backend/access/transam/xlog.c | 2 +- src/backend/replication/slot.c | 29 +++-- src/include/replication/slot.h | 13 ++- src/test/recovery/meson.build | 1 + .../t/038_save_logical_slots_shutdown.pl | 101 ++++++++++++++++++ 5 files changed, 133 insertions(+), 13 deletions(-) create mode 100644 src/test/recovery/t/038_save_logical_slots_shutdown.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f6f8adc72a..f26c8d18a6 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7039,7 +7039,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointRelationMap(); - CheckPointReplicationSlots(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointReplicationOrigin(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index bb09c4010f..c075f76317 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -109,7 +109,8 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); -static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown); /* * Report shared-memory space needed by ReplicationSlotsShmemInit. @@ -321,6 +322,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_xmin_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; + slot->last_saved_confirmed_flush = InvalidXLogRecPtr; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -783,7 +785,7 @@ ReplicationSlotSave(void) Assert(MyReplicationSlot != NULL); sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); - SaveSlotToPath(MyReplicationSlot, path, ERROR); + SaveSlotToPath(MyReplicationSlot, path, ERROR, false); } /* @@ -1572,11 +1574,10 @@ restart: /* * Flush all replication slots to disk. * - * This needn't actually be part of a checkpoint, but it's a convenient - * location. + * is_shutdown is true in case of a shutdown checkpoint. */ void -CheckPointReplicationSlots(void) +CheckPointReplicationSlots(bool is_shutdown) { int i; @@ -1601,7 +1602,7 @@ CheckPointReplicationSlots(void) /* save the slot to disk, locking is handled in SaveSlotToPath() */ sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); - SaveSlotToPath(s, path, LOG); + SaveSlotToPath(s, path, LOG, is_shutdown); } LWLockRelease(ReplicationSlotAllocationLock); } @@ -1707,7 +1708,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) /* Write the actual state file. */ slot->dirty = true; /* signal that we really need to write */ - SaveSlotToPath(slot, tmppath, ERROR); + SaveSlotToPath(slot, tmppath, ERROR, false); /* Rename the directory into place. */ if (rename(tmppath, path) != 0) @@ -1733,22 +1734,26 @@ CreateSlotOnDisk(ReplicationSlot *slot) * Shared functionality between saving and creating a replication slot. */ static void -SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool is_shutdown) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; int fd; ReplicationSlotOnDisk cp; bool was_dirty; + bool confirmed_flush_has_changed; /* first check whether there's something to write out */ SpinLockAcquire(&slot->mutex); was_dirty = slot->dirty; slot->just_dirtied = false; + confirmed_flush_has_changed = (slot->data.confirmed_flush != slot->last_saved_confirmed_flush); SpinLockRelease(&slot->mutex); - /* and don't do anything if there's nothing to write */ - if (!was_dirty) + /* Don't do anything if there's nothing to write. See ReplicationSlot. */ + if (!was_dirty && + !(is_shutdown && SlotIsLogical(slot) && confirmed_flush_has_changed)) return; LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE); @@ -1873,11 +1878,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) /* * Successfully wrote, unset dirty bit, unless somebody dirtied again - * already. + * already and remember the confirmed_flush LSN value. */ SpinLockAcquire(&slot->mutex); if (!slot->just_dirtied) slot->dirty = false; + slot->last_saved_confirmed_flush = slot->data.confirmed_flush; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2074,6 +2080,7 @@ RestoreSlotFromDisk(const char *name) /* initialize in memory state */ slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; + slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..448fb8cf51 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -178,6 +178,17 @@ typedef struct ReplicationSlot XLogRecPtr candidate_xmin_lsn; XLogRecPtr candidate_restart_valid; XLogRecPtr candidate_restart_lsn; + + /* + * We won't ensure that the slot is persisted after the confirmed_flush + * LSN is updated as that could lead to frequent writes. However, we need + * to ensure that we do persist the slots at the time of shutdown whose + * confirmed_flush LSN is changed since we last saved the slot to disk. + * This will help in avoiding retreat of the confirmed_flush LSN after + * restart. This variable is used to track the last saved confirmed_flush + * LSN value. + */ + XLogRecPtr last_saved_confirmed_flush; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) @@ -241,7 +252,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); -extern void CheckPointReplicationSlots(void); +extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index e7328e4894..646d6ffde4 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -43,6 +43,7 @@ tests += { 't/035_standby_logical_decoding.pl', 't/036_truncated_dropped.pl', 't/037_invalid_database.pl', + 't/038_save_logical_slots_shutdown.pl', ], }, } diff --git a/src/test/recovery/t/038_save_logical_slots_shutdown.pl b/src/test/recovery/t/038_save_logical_slots_shutdown.pl new file mode 100644 index 0000000000..6e114e9b29 --- /dev/null +++ b/src/test/recovery/t/038_save_logical_slots_shutdown.pl @@ -0,0 +1,101 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +# Test logical replication slots are always persisted to disk during a shutdown +# checkpoint. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +sub compare_confirmed_flush +{ + my ($node, $confirmed_flush_from_log) = @_; + + # Fetch Latest checkpoint location from the control file + my ($stdout, $stderr) = + run_command([ 'pg_controldata', $node->data_dir ]); + my @control_data = split("\n", $stdout); + my $latest_checkpoint = undef; + foreach (@control_data) + { + if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg) + { + $latest_checkpoint = $1; + last; + } + } + die "Latest checkpoint location not found in control file\n" + unless defined($latest_checkpoint); + + # Is it same as the value read from log? + ok( $latest_checkpoint eq $confirmed_flush_from_log, + "Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location" + ); + + return; +} + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('pub'); +$node_publisher->init(allows_streaming => 'logical'); +# Avoid checkpoint during the test, otherwise, the latest checkpoint location +# will change. +$node_publisher->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +}); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('sub'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); + +# Insert some data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tbl VALUES (generate_series(1, 5));"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" +); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl"); + +is($result, qq(5), "check initial copy was done"); + +my $offset = -s $node_publisher->logfile; + +# Restart the publisher to ensure that the slot will be persisted if required +$node_publisher->restart(); + +# Wait until the walsender creates decoding context +$node_publisher->wait_for_log( + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./, + $offset); + +# Extract confirmed_flush from the logfile +my $log_contents = slurp_file($node_publisher->logfile, $offset); +$log_contents =~ + qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./ + or die "could not get confirmed_flush_lsn"; + +# Ensure that the slot's confirmed_flush LSN is the same as the +# latest_checkpoint location. +compare_confirmed_flush($node_publisher, $1); + +done_testing(); -- 2.34.1