Hi,
On 4/7/23 8:24 PM, Drouvot, Bertrand wrote:
Hi,
On 4/7/23 5:47 PM, Andres Freund wrote:
Hi,
- write a test that invalidated logical slots do not lead to retaining WAL
I'm not sure how to do that since pg_switch_wal() and friends can't be executed
on
a standby.
You can do it on the primary and wait for the records to have been applied.
Thanks, will give it a try in a couple of hours.
I looked at it but I think we'd also need things like pg_walfile_name() on the
standby but is not allowed.
Is this patchset sufficient to subscribe to a publication on a physical
standby, assuming the publication is created on the primary? If so, we should
have at least a minimal test. If not, we should note that restriction
explicitly.
I gave it a try and it does work.
"
node3 subscribes to node2 (standby).
Insert done in node1 (primary) where the publication is created => node3 see
the changes.
"
I started to create the TAP test but currently stuck as the "create
subscription" waits for a checkpoint/pg_log_standby_snapshot() on the primary.
So, trying to make use of things like:
"my %psql_subscriber = ('stdin' => '', 'stdout' => '');
$psql_subscriber{run} =
$node_subscriber->background_psql('postgres', \$psql_subscriber{stdin},
\$psql_subscriber{stdout},
$psql_timeout);
$psql_subscriber{stdout} = '';
"
But in vain so far...
please find attached sub_in_progress.patch that "should work" but "does not"
because
the wait_for_subscription_sync() call produces:
"
error running SQL: 'psql:<stdin>:1: ERROR: recovery is in progress
HINT: WAL control functions cannot be executed during recovery.'
while running 'psql -XAtq -d port=61441 host=/tmp/45dt3wqs2p dbname='postgres'
-f - -v ON_ERROR_STOP=1' with sql 'SELECT pg_current_wal_lsn()'
"
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl
b/src/test/recovery/t/035_standby_logical_decoding.pl
index 561dcd33c3..c3c0e718c8 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -8,14 +8,18 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
-my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret,
$handle, $slot);
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr,
$subscriber_stdin, $subscriber_stdout, $subscriber_stderr, $ret, $handle,
$slot);
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
my $node_cascading_standby =
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $psql_timeout =
+ IPC::Run::timer(2 * $PostgreSQL::Test::Utils::timeout_default);
my $res;
+
# Name for the physical slot on primary
my $primary_slotname = 'primary_physical';
my $standby_physical_slotname = 'standby_physical';
@@ -263,6 +267,7 @@ $node_standby->init_from_backup(
has_restoring => 1);
$node_standby->append_conf('postgresql.conf',
qq[primary_slot_name = '$primary_slotname']);
+$node_standby->append_conf('postgresql.conf', 'max_replication_slots = 6');
$node_standby->start;
$node_primary->wait_for_replay_catchup($node_standby);
$node_standby->safe_psql('testdb', qq[SELECT * FROM
pg_create_physical_replication_slot('$standby_physical_slotname');]);
@@ -280,6 +285,20 @@ $node_cascading_standby->append_conf('postgresql.conf',
$node_cascading_standby->start;
$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+#######################
+# Initialize subscriber node
+#######################
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'max_replication_slots = 4');
+$node_subscriber->start;
+
+my %psql_subscriber = ('subscriber_stdin' => '', 'subscriber_stdout' => '');
+$psql_subscriber{run} =
+ $node_subscriber->background_psql('postgres',
\$psql_subscriber{subscriber_stdin},
+ \$psql_subscriber{subscriber_stdout},
+ $psql_timeout);
+$psql_subscriber{subscriber_stdout} = '';
+
##################################################
# Test that logical decoding on the standby
# behaves correctly.
@@ -360,6 +379,43 @@ is( $node_primary->psql(
3,
'replaying logical slot from another database fails');
+##################################################
+# Test that we can subscribe on the standby with the publication
+# created on the primary.
+##################################################
+
+# Create a table on the primary
+$node_primary->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary
key)");
+
+# Create a table (same structure) on the subscriber node
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary
key)");
+
+# Create a publication on the primary
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_primary->safe_psql('postgres', "ALTER PUBLICATION tap_pub ADD TABLE
tab_rep");
+
+my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
+
+# Subscribe on the standby
+$psql_subscriber{stdin} .= qq[
+ CREATE SUBSCRIPTION
tap_sub CONNECTION '$standby_connstr' PUBLICATION tap_pub;
+ ];
+$psql_subscriber{run}->pump_nb;
+$node_primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+$psql_subscriber{run}->finish;
+
+$node_primary->safe_psql('postgres', qq[INSERT INTO tab_rep select
generate_series(1,10);]);
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Wait for sync to finish
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM
tab_rep");
+is($result, qq(10),
+ 'check replicated inserts after subscription on standby');
+
+$node_subscriber->stop;
+
##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots
# Scenario 1: hot_standby_feedback off and vacuum FULL
@@ -402,7 +458,6 @@ check_slots_conflicting_status(1);
##################################################
# Verify that invalidated logical slots do not lead to retaining WAL
##################################################
-# XXXXX TODO
##################################################
# Recovery conflict: Invalidate conflicting slots, including in-use slots