On Tuesday, September 10, 2024 2:45 PM shveta malik <shveta.ma...@gmail.com> wrote: > > --- > > THE DESIGN > > --- > > > > To achieve the above, we plan to allow the logical walsender to > > maintain and advance the slot.xmin to protect the data in the user > > table and introduce a new logical standby feedback message. This > > message reports the WAL position that has been replayed on the logical > > standby *AND* the changes occurring on the logical standby before the > > WAL position are also replayed to the walsender's node (where the > > walsender is running). After receiving the new feedback message, the > > walsender will advance the slot.xmin based on the flush info, similar > > to the advancement of catalog_xmin. Currently, the effective_xmin/xmin > > of logical slot are unused during logical replication, so I think it's safe > > and > won't cause side-effect to reuse the xmin for this feature. > > > > We have introduced a new subscription option > > (feedback_slots='slot1,...'), where these slots will be used to check > > condition (b): the transactions on logical standbys occurring before > > the replay of Node A's DELETE are replayed on Node A as well. > > Therefore, on Node B, users should specify the slots corresponding to > > Node A in this option. The apply worker will get the oldest confirmed > > flush LSN among the specified slots and send the LSN as a feedback > message to the walsender. -- I also thought of making it an automaic way, e.g. > > let apply worker select the slots that acquired by the walsenders > > which connect to the same remote server(e.g. if apply worker's > > connection info or some other flags is same as the walsender's > > connection info). But it seems tricky because if some slots are > > inactive which means the walsenders are not there, the apply worker > > could not find the correct slots to check unless we save the host along with > the slot's persistence data. > > > > The new feedback message is sent only if feedback_slots is not NULL. > > If the slots in feedback_slots are removed, a final message containing > > InvalidXLogRecPtr will be sent to inform the walsender to forget about > > the slot.xmin. > > > > To detect update_deleted conflicts during update operations, if the > > target row cannot be found, we perform an additional scan of the table using > snapshotAny. > > This scan aims to locate the most recently deleted row that matches > > the old column values from the remote update operation and has not yet > > been removed by VACUUM. If any such tuples are found, we report the > > update_deleted conflict along with the origin and transaction information > that deleted the tuple. > > > > Please refer to the attached POC patch set which implements above > > design. The patch set is split into some parts to make it easier for the > > initial > review. > > Please note that each patch is interdependent and cannot work > independently. > > > > Thanks a lot to Kuroda-San and Amit for the off-list discussion. > > > > Suggestions and comments are highly appreciated ! > > > > Thank You Hou-San for explaining the design. But to make it easier to > understand, would you be able to explain the sequence/timeline of the > *new* actions performed by the walsender and the apply processes for the > given example along with new feedback_slot config needed > > Node A: (Procs: walsenderA, applyA) > T1: INSERT INTO t (id, value) VALUES (1,1); ts=10.00 AM > T2: DELETE FROM t WHERE id = 1; ts=10.02 AM > > Node B: (Procs: walsenderB, applyB) > T3: UPDATE t SET value = 2 WHERE id = 1; ts=10.01 AM
Thanks for reviewing! Let me elaborate further on the example: On node A, feedback_slots should include the logical slot that used to replicate changes from Node A to Node B. On node B, feedback_slots should include the logical slot that replicate changes from Node B to Node A. Assume the slot.xmin on Node A has been initialized to a valid number(740) before the following flow: Node A executed T1 - 10.00 AM T1 replicated and applied on Node B - 10.0001 AM Node B executed T3 - 10.01 AM Node A executed T2 (741) - 10.02 AM T2 replicated and applied on Node B (delete_missing) - 10.03 AM T3 replicated and applied on Node A (new action, detect update_deleted) - 10.04 AM (new action) Apply worker on Node B has confirmed that T2 has been applied locally and the transactions before T2 (e.g., T3) has been replicated and applied to Node A (e.g. feedback_slot.confirmed_flush_lsn >= lsn of the local replayed T2), thus send the new feedback message to Node A. - 10.05 AM (new action) Walsender on Node A received the message and would advance the slot.xmin.- 10.06 AM Then, after the slot.xmin is advanced to a number greater than 741, the VACUUM would be able to remove the dead tuple on Node A. Best Regards, Hou zj