On Mon, Dec 9, 2024 at 9:09 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Tue, Nov 26, 2024 at 3:03 AM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > I've attached a new version patch that incorporates all comments I got so > > far. > > > > Review comments:
Thank you for reviewing the patch! > =============== > 1. > + * The given transaction is marked as streamed if appropriate and the caller > + * requested it by passing 'mark_txn_streaming' as true. > + * > * 'txn_prepared' indicates that we have decoded the transaction at prepare > * time. > */ > static void > -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, > bool txn_prepared) > +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, > bool txn_prepared, > + bool mark_txn_streaming) > { > ... > } > + else if (mark_txn_streaming && (rbtxn_is_toptxn(txn) || > (txn->nentries_mem != 0))) > + { > + /* > + * Mark the transaction as streamed, if appropriate. > > The comments related to the above changes don't clarify in which cases > the 'mark_txn_streaming' should be set. Before this patch, it was > clear from the comments and code about the cases where we would decide > to mark it as streamed. I think we can rename it to txn_streaming for consistency with txn_prepared. I've changed the comment for that. > > 2. > + /* > + * Mark the transaction as aborted so we ignore future changes of this > + * transaction. > > /so we ignore/so we can ignore/ Fixed. > > 3. > * Helper function for ReorderBufferProcessTXN to handle the concurrent > - * abort of the streaming transaction. This resets the TXN such that it > - * can be used to stream the remaining data of transaction being processed. > - * This can happen when the subtransaction is aborted and we still want to > - * continue processing the main or other subtransactions data. > + * abort of the streaming (prepared) transaction. > ... > > In the above comment, "... streaming (prepared)...", you added > prepared to imply that this function handles concurrent abort for both > in-progress and prepared transactions. Am I correct? If so, the > current change makes it less clear. If you see the comments at its > caller, they are clearer. I think we don't need this change as the patch doesn't change what this function does and what the caller would expect. So removed. > > 4. > + /* > + * Remember if the transaction is already aborted so we can detect when > + * the transaction is concurrently aborted during the replay. > + */ > + already_aborted = rbtxn_is_aborted(txn); > + > ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, > txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); > > @@ -2832,10 +2918,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, > TransactionId xid, > * when rollback prepared is decoded and sent, the downstream should be > * able to rollback such a xact. See comments atop DecodePrepare. > * > - * Note, for the concurrent_abort + streaming case a stream_prepare was > + * Note, for the concurrent abort + streaming case a stream_prepare was > * already sent within the ReorderBufferReplay call above. > */ > - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) > + if (!already_aborted && rbtxn_is_aborted(txn) && !rbtxn_is_streamed(txn)) > rb->prepare(rb, txn, txn->final_lsn); > > It is not clear from the comments how the 'already_aborted' is > handled. I think after this patch we would have already truncated all > its changes. If so, why do we need to try to replay the changes of > such a xact? I used ReorderBufferReplay() for convenience; it sends begin_prepare() and prepare() appropriately, handles streaming-prepared transactions, and updates statistics etc. But as you pointed out, it would not be necessary to set up a historical snapshot etc. I agree that we don't need to try replaying such aborted transactions but I'd like to confirm we don't really need to execute invalidation messages evein in aborted transactions. > > 5. > +/* > + * Check the transaction status by looking CLOG and discard all changes if > + * the transaction is aborted. The transaction status is cached in > + * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the > + * next call. Return true if the transaction is aborted, otherwise return > + * false. > + * > + * When the 'debug_logical_replication_streaming' is set to "immediate", we > + * don't check the transaction status, meaning the caller will always process > + * this transaction. > + */ > +static bool > +ReorderBufferTruncateTXNIfAborted(ReorderBuffer *rb, ReorderBufferTXN *txn) > +{ > > I think this function is being invoked to mark a sub-transaction as > aborted. It is better to explain in comments how it interacts with > sub-transactions, why it is okay to mark them as aborted, and how the > other parts of the system interact with it. This function can be called for top-level transactions and subtransactions. IIUC there is no main difference between calling it for top-level transaction and subtransaction. What interaction with subtransactions are you concerned about? I've attached the updated patch. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
v11-0001-Skip-logical-decoding-of-already-aborted-transac.patch
Description: Binary data