Hi,

On 4/7/23 8:24 PM, Drouvot, Bertrand wrote:
Hi,

On 4/7/23 5:47 PM, Andres Freund wrote:
Hi,

- write a test that invalidated logical slots do not lead to retaining WAL

I'm not sure how to do that since pg_switch_wal() and friends can't be executed 
on
a standby.

You can do it on the primary and wait for the records to have been applied.


Thanks, will give it a try in a couple of hours.

I looked at it but I think we'd also need things like pg_walfile_name() on the 
standby but is not allowed.

Is this patchset sufficient to subscribe to a publication on a physical
standby, assuming the publication is created on the primary? If so, we should
have at least a minimal test. If not, we should note that restriction
explicitly.

I gave it a try and it does work.

"
node3 subscribes to node2 (standby).
Insert done in node1 (primary) where the publication is created => node3 see 
the changes.
"

I started to create the TAP test but currently stuck as the "create 
subscription" waits for a checkpoint/pg_log_standby_snapshot() on the primary.

So, trying to make use of things like:

"my %psql_subscriber = ('stdin' => '', 'stdout' => '');
$psql_subscriber{run} =
   $node_subscriber->background_psql('postgres', \$psql_subscriber{stdin},
     \$psql_subscriber{stdout},
     $psql_timeout);
$psql_subscriber{stdout} = '';
"

But in vain so far...


please find attached sub_in_progress.patch that "should work" but "does not" 
because
the wait_for_subscription_sync() call produces:

"
error running SQL: 'psql:<stdin>:1: ERROR:  recovery is in progress
HINT:  WAL control functions cannot be executed during recovery.'
while running 'psql -XAtq -d port=61441 host=/tmp/45dt3wqs2p dbname='postgres' 
-f - -v ON_ERROR_STOP=1' with sql 'SELECT pg_current_wal_lsn()'
"

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
index 561dcd33c3..c3c0e718c8 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -8,14 +8,18 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
-my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, 
$handle, $slot);
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, 
$subscriber_stdin, $subscriber_stdout, $subscriber_stderr, $ret, $handle, 
$slot);
 
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
 my $node_standby = PostgreSQL::Test::Cluster->new('standby');
 my $node_cascading_standby = 
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $psql_timeout =
+  IPC::Run::timer(2 * $PostgreSQL::Test::Utils::timeout_default);
 my $res;
 
+
 # Name for the physical slot on primary
 my $primary_slotname = 'primary_physical';
 my $standby_physical_slotname = 'standby_physical';
@@ -263,6 +267,7 @@ $node_standby->init_from_backup(
        has_restoring => 1);
 $node_standby->append_conf('postgresql.conf',
        qq[primary_slot_name = '$primary_slotname']);
+$node_standby->append_conf('postgresql.conf', 'max_replication_slots = 6');
 $node_standby->start;
 $node_primary->wait_for_replay_catchup($node_standby);
 $node_standby->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$standby_physical_slotname');]);
@@ -280,6 +285,20 @@ $node_cascading_standby->append_conf('postgresql.conf',
 $node_cascading_standby->start;
 $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
 
+#######################
+# Initialize subscriber node
+#######################
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'max_replication_slots = 4');
+$node_subscriber->start;
+
+my %psql_subscriber = ('subscriber_stdin' => '', 'subscriber_stdout' => '');
+$psql_subscriber{run} =
+  $node_subscriber->background_psql('postgres', 
\$psql_subscriber{subscriber_stdin},
+    \$psql_subscriber{subscriber_stdout},
+    $psql_timeout);
+$psql_subscriber{subscriber_stdout} = '';
+
 ##################################################
 # Test that logical decoding on the standby
 # behaves correctly.
@@ -360,6 +379,43 @@ is( $node_primary->psql(
     3,
     'replaying logical slot from another database fails');
 
+##################################################
+# Test that we can subscribe on the standby with the publication
+# created on the primary.
+##################################################
+
+# Create a table on the primary
+$node_primary->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary 
key)");
+
+# Create a table (same structure) on the subscriber node
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary 
key)");
+
+# Create a publication on the primary
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_primary->safe_psql('postgres', "ALTER PUBLICATION tap_pub ADD TABLE 
tab_rep");
+
+my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
+
+# Subscribe on the standby
+$psql_subscriber{stdin} .= qq[
+                                                       CREATE SUBSCRIPTION 
tap_sub CONNECTION '$standby_connstr' PUBLICATION tap_pub;
+                                                       ];
+$psql_subscriber{run}->pump_nb;
+$node_primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+$psql_subscriber{run}->finish;
+
+$node_primary->safe_psql('postgres', qq[INSERT INTO tab_rep select 
generate_series(1,10);]);
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Wait for sync to finish
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM 
tab_rep");
+is($result, qq(10),
+    'check replicated inserts after subscription on standby');
+
+$node_subscriber->stop;
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL
@@ -402,7 +458,6 @@ check_slots_conflicting_status(1);
 ##################################################
 # Verify that invalidated logical slots do not lead to retaining WAL
 ##################################################
-# XXXXX TODO
 
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots

Reply via email to