hi hackers,
In the logical decoding on standby thread [1], Andres proposed 2 new tests
(that I did
not find the time to complete before the finish line):
- Test that we can subscribe to the standby (with the publication created on
the primary)
- Verify that invalidated logical slots do not lead to retaining WAL
Please find those 2 missing tests in the patch proposal attached.
A few words about them:
1) Regarding the subscription test:
It modifies wait_for_catchup() to take into account the case where the
requesting
node is in recovery mode. Indeed, without that change,
wait_for_subscription_sync() was
failing with:
"
error running SQL: 'psql:<stdin>:1: ERROR: recovery is in progress
HINT: WAL control functions cannot be executed during recovery.'
while running 'psql -XAtq -d port=61441 host=/tmp/45dt3wqs2p dbname='postgres'
-f - -v ON_ERROR_STOP=1' with sql 'SELECT pg_current_wal_lsn()'
"
2) Regarding the WAL file not retained test:
As it's not possible to execute pg_switch_wal() and friends on a standby, this
is
done on the primary. Also checking that the WAL file (linked to a restart_lsn
of an invalidate
slot) has been removed is done directly at the os/directory level.
The attached patch also removes:
"
-log_min_messages = 'debug2'
-log_error_verbosity = verbose
"
as also discussed in [1].
I'm not sure if adding those 2 tests should be considered as an open item. I
can add this open item
if we think that makes sense. I'd be happy to do so but it looks like I don't
have the privileges
to edit https://wiki.postgresql.org/wiki/PostgreSQL_16_Open_Items
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
[1]:
https://www.postgresql.org/message-id/6d801661-e21b-7326-be1b-f90d904da66a%40gmail.com
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 6f7f4e5de4..819667d42a 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2644,7 +2644,16 @@ sub wait_for_catchup
}
if (!defined($target_lsn))
{
- $target_lsn = $self->lsn('write');
+ my $isrecovery = $self->safe_psql('postgres', "SELECT
pg_is_in_recovery()");
+ chomp($isrecovery);
+ if ($isrecovery eq 't')
+ {
+ $target_lsn = $self->lsn('replay');
+ }
+ else
+ {
+ $target_lsn = $self->lsn('write');
+ }
}
print "Waiting for replication conn "
. $standby_name . "'s "
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl
b/src/test/recovery/t/035_standby_logical_decoding.pl
index 94a8384c31..09bc417356 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -9,13 +9,19 @@ use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
+use Time::HiRes qw(usleep);
-my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret,
$handle, $slot);
+my ($stdin, $stdout, $stderr,
+ $cascading_stdout, $cascading_stderr, $subscriber_stdin,
+ $subscriber_stdout, $subscriber_stderr, $ret,
+ $handle, $slot);
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
my $node_cascading_standby =
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $psql_timeout = IPC::Run::timer(2 * $default_timeout);
my $res;
# Name for the physical slot on primary
@@ -235,8 +241,6 @@ $node_primary->append_conf('postgresql.conf', q{
wal_level = 'logical'
max_replication_slots = 4
max_wal_senders = 4
-log_min_messages = 'debug2'
-log_error_verbosity = verbose
});
$node_primary->dump_info;
$node_primary->start;
@@ -269,7 +273,8 @@ $node_standby->init_from_backup(
has_streaming => 1,
has_restoring => 1);
$node_standby->append_conf('postgresql.conf',
- qq[primary_slot_name = '$primary_slotname']);
+ qq[primary_slot_name = '$primary_slotname'
+ max_replication_slots = 5]);
$node_standby->start;
$node_primary->wait_for_replay_catchup($node_standby);
$node_standby->safe_psql('testdb', qq[SELECT * FROM
pg_create_physical_replication_slot('$standby_physical_slotname');]);
@@ -287,6 +292,27 @@ $node_cascading_standby->append_conf('postgresql.conf',
$node_cascading_standby->start;
$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+#######################
+# Initialize subscriber node
+#######################
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'max_replication_slots = 4');
+$node_subscriber->start;
+
+my %psql_subscriber = (
+ 'subscriber_stdin' => '',
+ 'subscriber_stdout' => '',
+ 'subscriber_stderr' => '');
+$psql_subscriber{run} = IPC::Run::start(
+ [ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres')
],
+ '<',
+ \$psql_subscriber{subscriber_stdin},
+ '>',
+ \$psql_subscriber{subscriber_stdout},
+ '2>',
+ \$psql_subscriber{subscriber_stderr},
+ $psql_timeout);
+
##################################################
# Test that logical decoding on the standby
# behaves correctly.
@@ -367,6 +393,58 @@ is( $node_primary->psql(
3,
'replaying logical slot from another database fails');
+##################################################
+# Test that we can subscribe on the standby with the publication
+# created on the primary.
+##################################################
+
+# Create a table on the primary
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a table (same structure) on the subscriber node
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a publication on the primary
+$node_primary->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub for table tab_rep");
+
+# Subscribe on the standby
+my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
+$psql_subscriber{subscriber_stdin} .=
+ qq[CREATE SUBSCRIPTION tap_sub CONNECTION '$standby_connstr' PUBLICATION
tap_pub;];
+$psql_subscriber{subscriber_stdin} .= "\n";
+
+$psql_subscriber{run}->pump_nb();
+
+# Speed up the subscription creation
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+
+# Explicitly shut down psql instance gracefully - to avoid hangs
+# or worse on windows
+$psql_subscriber{subscriber_stdin} .= "\\q\n";
+$psql_subscriber{run}->finish;
+
+# Insert some rows on the primary
+$node_primary->safe_psql('postgres',
+ qq[INSERT INTO tab_rep select generate_series(1,10);]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# To speed up the wait_for_subscription_sync
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
+
+# Check that the subscriber can see the rows inserted in the primary
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'check replicated inserts after subscription on standby');
+
+# We do not need the subscriber anymore
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->stop;
+
##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 1: hot_standby_feedback off and vacuum FULL
@@ -410,9 +488,82 @@ $node_standby->restart;
check_slots_conflicting_status(1);
##################################################
-# Verify that invalidated logical slots do not lead to retaining WAL
+# Verify that invalidated logical slots do not lead to retaining WAL.
##################################################
-# XXXXX TODO
+
+# Get the restart_lsn from an invalidated slot
+my $restart_lsn = $node_standby->safe_psql('postgres',
+ "SELECT restart_lsn from pg_replication_slots WHERE slot_name =
'vacuum_full_activeslot' and conflicting is true;"
+);
+
+chomp($restart_lsn);
+
+# Get the WAL file name associated to this lsn on the primary
+my $walfile_name = $node_primary->safe_psql('postgres',
+ "SELECT pg_walfile_name('$restart_lsn')");
+
+chomp($walfile_name);
+
+# Check the WAL file is still on the primary
+ok(-f $node_primary->data_dir . '/pg_wal/' . $walfile_name,
+ "WAL file still on the primary");
+
+# Get the number of WAL files on the standby
+my $nb_standby_files = $node_standby->safe_psql('postgres',
+ "SELECT COUNT(*) FROM pg_ls_dir('pg_wal')");
+
+chomp($nb_standby_files);
+
+# Switch WAL files on the primary
+my @c = (1 .. $nb_standby_files);
+
+$node_primary->safe_psql('postgres', "create table retain_test(a int)");
+
+for (@c)
+{
+ $node_primary->safe_psql(
+ 'postgres', "SELECT pg_switch_wal();
+ insert into retain_test values("
+ . $_ . ");");
+}
+
+# Ask for a checkpoint
+$node_primary->safe_psql('postgres', 'checkpoint;');
+
+# Check that the WAL file has not been retained on the primary
+ok(!-f $node_primary->data_dir . '/pg_wal/' . $walfile_name,
+ "WAL file not on the primary anymore");
+
+# Wait for the standby to catch up
+$node_primary->wait_for_catchup($node_standby);
+
+# Generate another WAL switch, more activity and a checkpoint
+$node_primary->safe_psql(
+ 'postgres', "SELECT pg_switch_wal();
+ insert into retain_test values(1);");
+$node_primary->safe_psql('postgres', 'checkpoint;');
+
+# Wait for the standby to catch up
+$node_primary->wait_for_catchup($node_standby);
+
+# Verify that the wal file has not been retained on the standby
+my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name;
+
+# We can not test if the WAL file still exists immediately.
+# We need to let some time to the standby to actually "remove" it.
+my $i = 0;
+while (1)
+{
+ last if !-f $standby_walfile;
+ if ($i++ == 10 * $default_timeout)
+ {
+ die
+ "could not determine if WAL file has been retained or not,
can't continue";
+ }
+ usleep(100_000);
+}
+
+ok(1, "invalidated logical slots do not lead to retaining WAL");
##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots