Hi everyone, 

When using logical replication with the pgoutput plugin, on PG 16,we do the 
following: 
1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, 
null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false') 
2) Get LSN of last row (Commit) 
3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>); 
4) Repeat. 


And this works perfectly fine when streaming = false. When turning on streaming 
the expectation is that the same thing happens, except the the LSN being passed 
to pg_replication_slot_advance() is for a Stream End record. On the next call 
to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream 
Start record. But instead, the stream starts over from the transaction Begin 
record. Observe: 


*** Demo starts *** 
*** Initially there are no changes, peek() returns nothing: *** 



=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, 
null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') 
WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); 
lsn | xid | data 
-----+-----+------ 
(0 rows) 


*** Slot status: *** 


=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM 
pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn 
--------------+--------+-------------+--------------------- 
test_slot_v1 | f | 2/98CE060 | 2/98CE060 
(1 rows) 




*** Now make some changes (delete then insert a bunch of records) and call 
peek() *** 
*** The predicate filters out Delete and Insert records, leaving Stream Start 
(\x53 = S), *** 
*** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c) 
*** 


abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', 
null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 
'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); 
lsn | xid | data 
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 
2/A222A20 | 1112 | \x530000045801 
2/A222A20 | 1112 | 
\x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 
2/C141BE8 | 1112 | \x45 
2/C141C28 | 1112 | \x530000045800 
2/DF598D8 | 1112 | \x45 
2/DF59950 | 1112 | 
\x630000045800000000020df59918000000020df599500002aca72900f8a8 
2/DF59950 | 1114 | \x530000045a01 
2/DF59950 | 1114 | 
\x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 
2/108918D0 | 1114 | \x45 
2/108918D0 | 1114 | \x530000045a00 
2/131E1310 | 1114 | \x45 
2/131E1310 | 1114 | \x530000045a00 
2/137D7768 | 1114 | \x45 
2/137E8448 | 1114 | 
\x630000045a0000000002137e841800000002137e84480002aca729812c96 
(14 rows) 


*** It was a peek() so the status is unchanged: *** 


=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM 
pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn 
--------------+--------+-------------+--------------------- 
test_slot_v1 | f | 2/98CE060 | 2/98CE060 
(1 rows) 


*** Now advance the slot to the first Stream End record: *** 


=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8'); 
slot_name | end_lsn 
--------------+----------- 
test_slot_v1 | 2/C141BE8 
(1 row) 


*** confirmed_flush_lsn is updated as expected: **** 


=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM 
pg_replication_slots; 
slot_name | active | restart_lsn | confirmed_flush_lsn 
--------------+--------+-------------+--------------------- 
test_slot_v1 | f | 2/9B09D10 | 2/C141BE8 
(1 rows) 


*** Now peek() again. It is starting from earlier than confirmed_flush_lsn: *** 


=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, 
null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') 
WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); 
lsn | xid | data 
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 
2/A222A20 | 1112 | \x530000045801 
2/A222A20 | 1112 | 
\x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 
2/C141BE8 | 1112 | \x45 
2/C141C28 | 1112 | \x530000045800 
2/DF598D8 | 1112 | \x45 
2/DF59950 | 1112 | 
\x630000045800000000020df59918000000020df599500002aca72900f8a8 
2/DF59950 | 1114 | \x530000045a01 
2/DF59950 | 1114 | 
\x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 
2/108918D0 | 1114 | \x45 
2/108918D0 | 1114 | \x530000045a00 
2/131E1310 | 1114 | \x45 
2/131E1310 | 1114 | \x530000045a00 
2/137D7768 | 1114 | \x45 
2/137E8448 | 1114 | 
\x630000045a0000000002137e841800000002137e84480002aca729812c96 
(14 rows) 


*** Next advance to the Stream Commit record: *** 


=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950'); 
slot_name | end_lsn 
--------------+----------- 
test_slot_v1 | 2/DF59950 
(1 row) 


*** This time the peek() starts from the correct LSN: *** 


=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, 
null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') 
WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); 
lsn | xid | data 
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 
2/DF59950 | 1114 | \x530000045a01 
2/DF59950 | 1114 | 
\x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 
2/108918D0 | 1114 | \x45 
2/108918D0 | 1114 | \x530000045a00 
2/131E1310 | 1114 | \x45 
2/131E1310 | 1114 | \x530000045a00 
2/137D7768 | 1114 | \x45 
2/137E8448 | 1114 | 
\x630000045a0000000002137e841800000002137e84480002aca729812c96 
(8 rows) 


*** End of demo *** 


The question is whether that is by design or a bug, and if by design maybe 
someone can explain how this is meant to be used, because it's not clear. It 
will work eventually if argument upto_nchanges is NULL, because when the 
transaction completes we get a Stream Commit record and can advance, but in the 
meantime we'll have ingested a lot of duplicate records we now have to deal 
with. And if argument upto_nchanges is not NULL we're stuck because peek() will 
only returns one or more Stream blocks until the number of returned rows 
exceeds upto_nchanges , and then returns the same blocks over and over again 
forever because we cannot advance, and never see the Stream Commit record. 


Thank you. 


Guillaume. 

Reply via email to