Hi everyone,
When using logical replication with the pgoutput plugin, on PG 16,we do the following: 1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false') 2) Get LSN of last row (Commit) 3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>); 4) Repeat. And this works perfectly fine when streaming = false. When turning on streaming the expectation is that the same thing happens, except the the LSN being passed to pg_replication_slot_advance() is for a Stream End record. On the next call to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream Start record. But instead, the stream starts over from the transaction Begin record. Observe: *** Demo starts *** *** Initially there are no changes, peek() returns nothing: *** => SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); lsn | xid | data -----+-----+------ (0 rows) *** Slot status: *** => SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn --------------+--------+-------------+--------------------- test_slot_v1 | f | 2/98CE060 | 2/98CE060 (1 rows) *** Now make some changes (delete then insert a bunch of records) and call peek() *** *** The predicate filters out Delete and Insert records, leaving Stream Start (\x53 = S), *** *** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c) *** abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); lsn | xid | data ------------+------+-------------------------------------------------------------------------------------------------------------------------------------- 2/A222A20 | 1112 | \x530000045801 2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff 2/C141BE8 | 1112 | \x45 2/C141C28 | 1112 | \x530000045800 2/DF598D8 | 1112 | \x45 2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8 2/DF59950 | 1114 | \x530000045a01 2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff 2/108918D0 | 1114 | \x45 2/108918D0 | 1114 | \x530000045a00 2/131E1310 | 1114 | \x45 2/131E1310 | 1114 | \x530000045a00 2/137D7768 | 1114 | \x45 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96 (14 rows) *** It was a peek() so the status is unchanged: *** => SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn --------------+--------+-------------+--------------------- test_slot_v1 | f | 2/98CE060 | 2/98CE060 (1 rows) *** Now advance the slot to the first Stream End record: *** => SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8'); slot_name | end_lsn --------------+----------- test_slot_v1 | 2/C141BE8 (1 row) *** confirmed_flush_lsn is updated as expected: **** => SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn --------------+--------+-------------+--------------------- test_slot_v1 | f | 2/9B09D10 | 2/C141BE8 (1 rows) *** Now peek() again. It is starting from earlier than confirmed_flush_lsn: *** => SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); lsn | xid | data ------------+------+-------------------------------------------------------------------------------------------------------------------------------------- 2/A222A20 | 1112 | \x530000045801 2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff 2/C141BE8 | 1112 | \x45 2/C141C28 | 1112 | \x530000045800 2/DF598D8 | 1112 | \x45 2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8 2/DF59950 | 1114 | \x530000045a01 2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff 2/108918D0 | 1114 | \x45 2/108918D0 | 1114 | \x530000045a00 2/131E1310 | 1114 | \x45 2/131E1310 | 1114 | \x530000045a00 2/137D7768 | 1114 | \x45 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96 (14 rows) *** Next advance to the Stream Commit record: *** => SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950'); slot_name | end_lsn --------------+----------- test_slot_v1 | 2/DF59950 (1 row) *** This time the peek() starts from the correct LSN: *** => SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44'); lsn | xid | data ------------+------+-------------------------------------------------------------------------------------------------------------------------------------- 2/DF59950 | 1114 | \x530000045a01 2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff 2/108918D0 | 1114 | \x45 2/108918D0 | 1114 | \x530000045a00 2/131E1310 | 1114 | \x45 2/131E1310 | 1114 | \x530000045a00 2/137D7768 | 1114 | \x45 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96 (8 rows) *** End of demo *** The question is whether that is by design or a bug, and if by design maybe someone can explain how this is meant to be used, because it's not clear. It will work eventually if argument upto_nchanges is NULL, because when the transaction completes we get a Stream Commit record and can advance, but in the meantime we'll have ingested a lot of duplicate records we now have to deal with. And if argument upto_nchanges is not NULL we're stuck because peek() will only returns one or more Stream blocks until the number of returned rows exceeds upto_nchanges , and then returns the same blocks over and over again forever because we cannot advance, and never see the Stream Commit record. Thank you. Guillaume.