On Mon, Apr 28, 2025 at 7:33 PM Zhijie Hou (Fujitsu) wrote: > > Thanks for reviewing. Here is V3 patch that addressed it. > > BTW, I also did some tests to confirm the catalog_xmin could still be > ahead in some case, and here is an example: > > 1. Create a failover replication slot named 'logicalslot' on primary > and acquire it in the walsender. > > 2. Log two standby snapshots on primary. Before logging, call > txid_current() To assign a xid, so that each standby snapshot will > hold a new xid in its oldestrunningXid field: > - txid_current(); > - `0/3000420` - `running_xacts` (no running transactions, > oldestrunningXid = 755) > - txid_current(); > - `0/3000488` - `running_xacts` (no running transactions, > oldestrunningXid = 756) > > 3. The walsender sets `0/3000420` as the `candidate_restart_lsn`, 755 as > `candidate_catalog_xmin`, skipping the second `running_xacts` because > `candidate_restart_lsn`/`candidate_catalog_xmin` is already valid. > > 4. After receiving a reply from the apply worker, the walsender assigns > `0/3000420` to `restart_lsn`, `755` to `catalog_xmin`. At this point, the > replication slot 'logicalslot' has `restart_lsn` set to `0/3000420`, > `catalog_xmin` set to `755`. > > 5. On the standby, execute `pg_sync_replication_slots()` to synchronize > 'logicalslot'. > > 6. During synchronization, with the initial `restart_lsn` at `0/3000420`, the > sync slot reaches a consistent point at this position. As a result, it does > not update `candidate_restart_lsn` and `candidate_catalog_xmin` at > consistent point (refer to `SnapBuildProcessRunningXacts()`). > > 7. The sync process identifies the second standby snapshot at > `0/3000488` and > uses its LSN as `candidate_restart_lsn`, and use the > oldestrunningXid `756` > as `candidate_catalog_xmin`, eventually updating it to `restart_lsn` and > `catalog_xmin`. > > 8. Now, the synced slot holds `restart_lsn` at `0/3000488`, `catalog_xmin` at > `756`, which are all ahead of the remote slot on the primary server. > > Attaching a script to reproduce the same. > > Note that, to reproduce this stably, we'd better modify the value of > LOG_SNAPSHOT_INTERVAL_MS in bgwriter.c to a bigger number to avoid > unexpected xl_running_xacts logging.
In addition to above steps, for those interested in reproducing the specific scenario where two_phase_at advances past the synced confirmed_flush, I'm attaching a new script. This script can reproduce the issue after applying the injection points I provided. Best Regards, Hou zj
From a4a314b7041779d6e5725a9d66b0135715d393f4 Mon Sep 17 00:00:00 2001 From: Zhijie Hou <houzj.f...@cn.fujitsu.com> Date: Tue, 29 Apr 2025 13:56:25 +0800 Subject: [PATCH] injection point --- src/backend/replication/logical/logical.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a8d2e024d34..afe194fb398 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -41,6 +41,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/memutils.h" @@ -603,6 +604,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); } + INJECTION_POINT("initialized-two-phase-at"); + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; ereport(LOG, -- 2.30.0.windows.2
#!/bin/bash port_primary=5432 port_secondary=5433 port_third=5434 port_secondary_2=5436 port_secondary_3=5437 port_subscriber=5434 port_subscriber2=5435 echo '==========' echo '=Clean up=' echo '==========' pg_ctl stop -D data_primary pg_ctl stop -D data_secondary pg_ctl stop -D data_third_ pg_ctl stop -D data_secondary_2 pg_ctl stop -D data_secondary_3 pg_ctl stop -D data_subscriber pg_ctl stop -D data_subscriber2 rm -rf data_* *log rm -rf /home/houzj/archivedir/* echo '=======================' echo '=Set up primary server=' echo '=======================' initdb -D data_primary cat << EOF >> data_primary/postgresql.conf wal_level = logical port = $port_primary #standby_slot_names = 'physical' #log_replication_commands = 'on' #max_slot_wal_keep_size = 64kB max_wal_senders=550 max_worker_processes=1000 max_replication_slots=550 log_replication_commands = 'on' checkpoint_timeout = 1d shared_buffers = 6GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = on checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s #log_min_messages = 'debug2' #archive_mode = on #archive_command = 'cp %p /home/houzj/archivedir/%f' #restore_command = 'cp /home/houzj/archivedir/%f %p' max_prepared_transactions = 10 EOF pg_ctl -D data_primary start -w -l primary.log psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');" echo '=========================' echo '=Set up secondary server=' echo '=========================' psql -d postgres -p $port_primary -c "CHECKPOINT;" pg_basebackup -D data_secondary -p $port_primary cat << EOF >> data_secondary/postgresql.conf port = $port_secondary primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgres' #primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgreis' primary_slot_name = 'physical' hot_standby = on hot_standby_feedback = on #sync_replication_slots = on #standby_slot_names = '' EOF cat << EOF >> data_secondary/standby.signal EOF pg_ctl -D data_secondary start -w psql -d postgres -p $port_secondary -c "SELECT 'init' FROM pg_create_logical_replication_slot('stuck', 'pgoutput');" & sleep 1 psql -d postgres -p $port_primary -c "CHECKPOINT;" echo '===================' echo '=Set up subscirber=' echo '===================' initdb -D data_subscriber cat << EOF >> data_subscriber/postgresql.conf port = $port_subscriber checkpoint_timeout = 1h shared_buffers = '8GB' wal_buffers = '1GB' max_connections = '5000' max_wal_size = 20GB min_wal_size = 10GB max_wal_senders = 100 max_replication_slots = 101 autovacuum = off wal_sender_timeout = 6000s wal_receiver_timeout = 6000s max_prepared_transactions = 10 EOF pg_ctl start -D data_subscriber psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); CREATE PUBLICATION pub FOR TABLE tab1;" psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);" psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = false, two_phase=false, failover=true, enabled=false)" psql -d postgres -p $port_primary -c "SELECT pg_current_wal_lsn();" psql -d postgres -p $port_primary -c "SELECT txid_current();" psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();" psql -d postgres -p $port_primary -c "SELECT txid_current();" psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();" psql -d postgres -p $port_primary -c "SELECT txid_current();" psql -d postgres -p $port_primary -c "SELECT pg_log_standby_snapshot();" sleep 1 psql -d postgres -p $port_subscriber -c "ALTER SUBSCRIPTION sub ENABLE;" psql -d postgres -p $port_primary -c "SELECT restart_lsn FROM pg_replication_slots;" sleep 1 psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();" psql -d postgres -p $port_primary -c "BEGIN; SELECT pg_logical_emit_message(true, 'test', 'test'); PREPARE TRANSACTION 'slotsync_twophase_prepared';" sleep 1 psql -d postgres -p $port_primary -c "create extension injection_points;SELECT injection_points_attach('initialized-two-phase-at', 'wait');" psql -d postgres -p $port_subscriber -c "alter subscription sub disable;" sleep 1 psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase =on); alter subscription sub enable ;" sleep 1 psql -d postgres -p $port_secondary -c "select pg_sync_replication_slots();" exit