On Sat, May 10, 2025 at 4:59 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, May 6, 2025 at 4:52 PM Zhijie Hou (Fujitsu)
> <houzj.f...@fujitsu.com> wrote:
> >
> > On Mon, May 5, 2025 at 6:59 PM Amit Kapila wrote:
> > >
> > >
> > > Yes, this is possible. Here is my theory as to how it can happen in the 
> > > current
> > > case. In the failed test, after the primary has prepared a transaction, 
> > > the
> > > transaction won't be replicated to the subscriber as two_phase was not
> > > enabled for the slot. However, subsequent keepalive messages can send the
> > > latest WAL location to the subscriber and get the confirmation of the 
> > > same from
> > > the subscriber without its origin being moved. Now, after we restart the 
> > > apply
> > > worker (due to disable/enable for a subscription), it will use the 
> > > previous
> > > origin_lsn to temporarily move back the confirmed flush LSN as explained 
> > > in
> > > one of the previous emails in another thread [1]. During this temporary
> > > movement of confirm flush LSN, the slotsync worker fetches the 
> > > two_phase_at
> > > and confirm_flush_lsn values, leading to the assertion failure. We see 
> > > this
> > > issue intermittently because it depends on the timing of slotsync worker's
> > > request to fetch the slot's value.
> >
> > Based on this theory, I can reproduce the BF failure in the 040 tap-test on
> > HEAD after applying the 0001 patch. This is achieved by using the injection
> > point to stop the walsender from sending a keepalive before receiving the 
> > old
> > origin position from the apply worker, ensuring the confirmed_flush
> > consistently moves backward before slotsync.
> >
> > Additionally, I've reproduced the duplicate data issue on HEAD without 
> > slotsync
> > using the attached script (after applying the injection point patch). This
> > issue arises if we immediately disable the subscription after the
> > confirm_flush_lsn moves backward, preventing the walsender from advancing 
> > the
> > confirm_flush_lsn.
> >
>
> Script contents:
> psql -d postgres -p $port_primary -c "create extension
> injection_points;SELECT injection_points_attach('process-replies',
> 'wait');"
>
> psql -d postgres -p $port_subscriber -c "alter subscription sub set
> (two_phase =on); alter subscription sub enable ;"
>
> sleep 1
>
> psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"
>
> I think what you said in the above paragraph is happening here. How
> can walsender move back the confirm_flush_lsn backwards when it is
> waiting due to the injection point? I think I am missing something
> here. It would be good if you could add a few comments to your
> scripts.
>

Please see my comments in the attached (updated) script. The testcase
to reproduce the issue on HEAD is the same, only the comments have
been added to elaborate the flow which moves confirmed_flush backward.

thanks
Shveta
#!/bin/bash

# The idea of script is to enable two_phase after Preparing a transaction
# and then attempt to move confirmed_lsn backward (prior to two_phase_at)
# resulting in replay of PREPARE twice. 
# Script achieves this by stoppign walsender using injection point when needed.


port_primary=5433
port_secondary=5434
port_subscriber=5435

echo '=========='
echo '=Clean up='
echo '=========='

pg_ctl stop -D data_primary
pg_ctl stop -D data_secondary
pg_ctl stop -D data_subscriber

rm -rf data_* *log

echo '======================='
echo '=Set up primary server='
echo '======================='

initdb -D data_primary

cat << EOF >> data_primary/postgresql.conf
wal_level = logical 
port = $port_primary
#standby_slot_names = 'physical'
#log_replication_commands = 'on'
#max_slot_wal_keep_size = 64kB

max_wal_senders=550
max_worker_processes=1000 
max_replication_slots=550 
log_replication_commands = 'on' 
checkpoint_timeout = 1d 
shared_buffers = 6GB 
max_worker_processes = 32 
max_parallel_maintenance_workers = 24 
max_parallel_workers = 32 
synchronous_commit = on 
checkpoint_timeout = 1d 
max_wal_size = 24GB 
min_wal_size = 15GB 
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s

log_min_messages = 'debug1'


max_prepared_transactions = 10

EOF


pg_ctl -D data_primary start -w -l primary.log 

psql -d postgres -p $port_primary -c "SELECT * FROM pg_create_physical_replication_slot('physical');"

echo '========================='
echo '=Set up secondary server='
echo '========================='

psql -d postgres -p $port_primary -c "CHECKPOINT;"

pg_basebackup -D data_secondary -p $port_primary

cat << EOF >> data_secondary/postgresql.conf
port = $port_secondary
primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgres'
#primary_conninfo = 'port=$port_primary application_name=secondary dbname=postgreis'
primary_slot_name = 'physical'
hot_standby = on 
hot_standby_feedback = on
#sync_replication_slots = on
#standby_slot_names = ''
EOF

cat << EOF >> data_secondary/standby.signal
EOF

pg_ctl -D data_secondary start -w -l sec.log 


psql -d postgres -p $port_secondary -c "SELECT 'init' FROM pg_create_logical_replication_slot('stuck', 'pgoutput');" &

sleep 1

psql -d postgres -p $port_primary -c "CHECKPOINT;"

echo '==================='
echo '=Set up subscirber='
echo '==================='

initdb -D data_subscriber

cat << EOF >> data_subscriber/postgresql.conf
port = $port_subscriber


checkpoint_timeout = 1h
shared_buffers = '8GB'
wal_buffers = '1GB'
max_connections = '5000'
max_wal_size = 20GB
min_wal_size = 10GB
max_wal_senders = 100
max_replication_slots = 101
autovacuum = off

wal_sender_timeout = 6000s
wal_receiver_timeout = 6000s


wal_receiver_status_interval = 1
max_prepared_transactions = 10
log_min_messages = 'debug1'
EOF

pg_ctl start -D data_subscriber -l sub.log

psql -d postgres -p $port_primary -c "CREATE TABLE tab1(a int); INSERT INTO tab1 VALUES(1); CREATE PUBLICATION pub FOR TABLE tab1;"

psql -d postgres -p $port_subscriber -c "CREATE TABLE tab1(a int);"
psql -d postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'dbname=postgres port=$port_primary' PUBLICATION pub WITH (slot_name='logicalslot', create_slot=true, copy_data = true, two_phase=false, failover=true, enabled=true)"

sleep 2 

# PREPARE the transaction with two_phase disabled.
# It will not be transimitted to subscriber as two_phase is disabled
psql -d postgres -p $port_primary -c "INSERT INTO tab1 VALUES(1);"
psql -d postgres -p $port_primary -c "BEGIN; SELECT pg_logical_emit_message(true, 'test', 'test'); PREPARE TRANSACTION 'slotsync_twophase_prepared';"

sleep 1

# By this time, walsender must have sent keepalive message to subscriber making
# subscriber to acknowledge the lsn resulting in confirmed_flush advancement
# on pub without origin_lsn advancement on sub.


# Now enable the two_phase but "after" invoking injection point in walsender to
# stop it from sending further keepalive messages which may advance confirmed_lsn
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"
sleep 1 

psql -d postgres -p $port_primary -c "create extension injection_points;SELECT injection_points_attach('process-replies', 'wait');"

# This sub enable with two_phase=ON will restart replication using old origin_lsn.
# The two_phase_at will be set to confirmed_flush (advanced one) in 
# StartLogicalReplication-->CreateDecodingContext
# and then walsender will be stuck on injection point wait in WalSndLoop.
#
# Note that since walsender is stopped using injection point before ProcessRepliesIfAny(),
# it will not be able to set confirmed_flush to origin_lsn yet. Also it will not be able to
# send any further keepalive messages.
psql -d postgres -p $port_subscriber -c "alter subscription sub set (two_phase =on); alter subscription sub enable ;"

sleep 2 

# Disable the subscription and trigger COMMIT PREPARE on pub and 
# meanwhile release the wait in walsender as well for it to process
# the changes once subscription is enabled.
psql -d postgres -p $port_subscriber -c "alter subscription sub disable;"

# As soon as walsender's injection point is released here, it will hit ProcessRepliesIfAny() and 
# will proceed with setting confirmed_flush to origin_lsn sent by previous 'alter subscription sub enable'
# command and will then exit.
#
# Now the state is:
# -- two_phase_at is set to advanced confirmed_lsn
# -- confirmed_lsn set to origin_lsn sent by first 'subscription enable' resulting in confirmed_lsn < two_phase_at.
psql -d postgres -p $port_primary -c "SELECT injection_points_wakeup('process-replies');SELECT injection_points_detach('process-replies');"
sleep 1

psql -d postgres -p $port_primary -c "COMMIT PREPARED 'slotsync_twophase_prepared';"

# Now, with this enable, walsender with restart with confirmed_flush < two_phase_at.
# In such a case, since a prepared transaction exists before two_phase_at, then after
# re-enabling the subscription, it will replicate that prepared transaction when
# decoding the PREPARE record and replicate that again when decoding the COMMIT
# PREPARED record, resulting in erorr on sub:
# ERROR: transaction identifier "pg_gid_16387_755" is already in use. 
psql -d postgres -p $port_subscriber -c "alter subscription sub enable;"

sleep 1

exit

Reply via email to