On Mon, Dec 9, 2024 at 9:09 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Nov 26, 2024 at 3:03 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
> >
> > I've attached a new version patch that incorporates all comments I got so 
> > far.
> >
>
> Review comments:

Thank you for reviewing the patch!

> ===============
> 1.
> + * The given transaction is marked as streamed if appropriate and the caller
> + * requested it by passing 'mark_txn_streaming' as true.
> + *
>   * 'txn_prepared' indicates that we have decoded the transaction at prepare
>   * time.
>   */
>  static void
> -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
> bool txn_prepared)
> +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
> bool txn_prepared,
> + bool mark_txn_streaming)
>  {
> ...
>   }
> + else if (mark_txn_streaming && (rbtxn_is_toptxn(txn) ||
> (txn->nentries_mem != 0)))
> + {
> + /*
> + * Mark the transaction as streamed, if appropriate.
>
> The comments related to the above changes don't clarify in which cases
> the 'mark_txn_streaming' should be set. Before this patch, it was
> clear from the comments and code about the cases where we would decide
> to mark it as streamed.

I think we can rename it to txn_streaming for consistency with
txn_prepared. I've changed the comment for that.

>
> 2.
> + /*
> + * Mark the transaction as aborted so we ignore future changes of this
> + * transaction.
>
> /so we ignore/so we can ignore/

Fixed.

>
> 3.
> * Helper function for ReorderBufferProcessTXN to handle the concurrent
> - * abort of the streaming transaction.  This resets the TXN such that it
> - * can be used to stream the remaining data of transaction being processed.
> - * This can happen when the subtransaction is aborted and we still want to
> - * continue processing the main or other subtransactions data.
> + * abort of the streaming (prepared) transaction.
> ...
>
> In the above comment, "... streaming (prepared)...", you added
> prepared to imply that this function handles concurrent abort for both
> in-progress and prepared transactions. Am I correct? If so, the
> current change makes it less clear. If you see the comments at its
> caller, they are clearer.

I think we don't need this change as the patch doesn't change what
this function does and what the caller would expect. So removed.

>
> 4.
> + /*
> + * Remember if the transaction is already aborted so we can detect when
> + * the transaction is concurrently aborted during the replay.
> + */
> + already_aborted = rbtxn_is_aborted(txn);
> +
>   ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
>   txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
>
> @@ -2832,10 +2918,10 @@ ReorderBufferPrepare(ReorderBuffer *rb,
> TransactionId xid,
>   * when rollback prepared is decoded and sent, the downstream should be
>   * able to rollback such a xact. See comments atop DecodePrepare.
>   *
> - * Note, for the concurrent_abort + streaming case a stream_prepare was
> + * Note, for the concurrent abort + streaming case a stream_prepare was
>   * already sent within the ReorderBufferReplay call above.
>   */
> - if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
> + if (!already_aborted && rbtxn_is_aborted(txn) && !rbtxn_is_streamed(txn))
>   rb->prepare(rb, txn, txn->final_lsn);
>
> It is not clear from the comments how the 'already_aborted' is
> handled. I think after this patch we would have already truncated all
> its changes. If so, why do we need to try to replay the changes of
> such a xact?

I used ReorderBufferReplay() for convenience; it sends begin_prepare()
and prepare() appropriately, handles streaming-prepared transactions,
and updates statistics etc. But as you pointed out, it would not be
necessary to set up a historical snapshot etc. I agree that we don't
need to try replaying such aborted transactions but I'd like to
confirm we don't really need to execute invalidation messages evein in
aborted transactions.

>
> 5.
> +/*
> + * Check the transaction status by looking CLOG and discard all changes if
> + * the transaction is aborted. The transaction status is cached in
> + * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
> + * next call. Return true if the transaction is aborted, otherwise return
> + * false.
> + *
> + * When the 'debug_logical_replication_streaming' is set to "immediate", we
> + * don't check the transaction status, meaning the caller will always process
> + * this transaction.
> + */
> +static bool
> +ReorderBufferTruncateTXNIfAborted(ReorderBuffer *rb, ReorderBufferTXN *txn)
> +{
>
> I think this function is being invoked to mark a sub-transaction as
> aborted. It is better to explain in comments how it interacts with
> sub-transactions, why it is okay to mark them as aborted, and how the
> other parts of the system interact with it.

This function can be called for top-level transactions and
subtransactions. IIUC there is no main difference between calling it
for top-level transaction and subtransaction. What interaction with
subtransactions are you concerned about?

I've attached the updated patch.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

Attachment: v11-0001-Skip-logical-decoding-of-already-aborted-transac.patch
Description: Binary data

Reply via email to