On Sat, Nov 26, 2022 at 12:15 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, Nov 25, 2022 at 5:38 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > > > > > During DecodeCommit() for skipping a transaction we use ReadRecPtr to > > > check whether to skip this transaction or not. Whereas in > > > ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to > > > stream or not. Generally it will not create a problem but if the > > > commit record itself is adding some changes to the transaction(e.g. > > > snapshot) and if the "start_decoding_at" is in between ReadRecPtr and > > > EndRecPtr then streaming will decide to stream the transaction where > > > as DecodeCommit will decide to skip it. And for handling this case in > > > ReorderBufferForget() we call stream_abort(). > > > > > > > The other cases are probably where we don't have FilterByOrigin or > > dbid check, for example, XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS. > > We anyway actually don't send anything for such cases except empty > > start/stop messages. Can we add some flag to txn which says that there > > is at least one change like DML that we want to stream? > > > > We can probably think of using txn_flags for this purpose.
In the attached patch I have used txn_flags to identify whether it has any streamable change or not and the transaction will not be selected for streaming unless it has at least one streamable change. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From f330d37f6ac1930cde1d2773dcd568b9a35454c9 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilip.ku...@enterprisedb.com> Date: Fri, 25 Nov 2022 13:11:44 +0530 Subject: [PATCH v2] Fix thinko in when to stream a transaction Actually, during DecodeCommit() for skipping a transaction we use ReadRecPtr to check whether to skip this transaction or not. Whereas in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to stream or not. Generally it will not create a problem but if the commit record itslef is adding some changes to the transaction(e.g. snapshot) and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then streaming will decide to stream the transaction where as DecodeCommit will decide to skip it. And for handling this case in ReorderBufferForget() we call stream_abort() in order to abort any streamed changes. So ideally if we are planning to skip the transaction we should never stream it hence there is no need to stream abort such transaction in case of skip. Along with that we also skip the transaction if the transaction dbid is not same slot dbid or it is filtered by origin id. So in corner cases it is possible that we might stream the transaction but later it will be skipped in DecodeCommit. For fixing that do not select any transaction for streaming unless there is any streamable change and if there is any streamable change then we can safely select it for streaming as it will not be skipped by DecodeCommit. --- src/backend/replication/logical/reorderbuffer.c | 34 +++++++++++++++++++++---- src/include/replication/reorderbuffer.h | 23 +++++++++++------ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 31f7381..e1a031d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -793,6 +793,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, return; } + /* + * If there are any streamable changes getting queued then get the top + * transaction and mark it has streamable change. This is required for + * streaming in-progress transactions, the in-progress transaction will + * not be selected for streaming unless it has at least one streamable + * change. + */ + if (change->action == REORDER_BUFFER_CHANGE_INSERT || + change->action == REORDER_BUFFER_CHANGE_UPDATE || + change->action == REORDER_BUFFER_CHANGE_DELETE || + change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || + change->action == REORDER_BUFFER_CHANGE_TRUNCATE) + { + ReorderBufferTXN *toptxn; + + /* get the top transaction */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; + } + change->lsn = lsn; change->txn = txn; @@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; - /* For streamed transactions notify the remote node about the abort. */ - if (rbtxn_is_streamed(txn)) - rb->stream_abort(rb, txn, lsn); + /* the transaction which is being skipped shouldn't have been streamed */ + Assert(!rbtxn_is_streamed(txn)); /* cosmetic... */ txn->final_lsn = lsn; @@ -3502,7 +3525,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) && + rbtxn_has_streamable_change(txn)) { largest = txn; largest_size = txn->total_size; @@ -3919,7 +3943,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb) * restarting. */ if (ReorderBufferCanStream(rb) && - !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr)) return true; return false; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b23d8cc..9766c9f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -168,14 +168,15 @@ typedef struct ReorderBufferChange } ReorderBufferChange; /* ReorderBufferTXN txn_flags */ -#define RBTXN_HAS_CATALOG_CHANGES 0x0001 -#define RBTXN_IS_SUBXACT 0x0002 -#define RBTXN_IS_SERIALIZED 0x0004 -#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 -#define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 -#define RBTXN_PREPARE 0x0040 -#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -207,6 +208,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) +/* Has this transaction contains streamable change? */ +#define rbtxn_has_streamable_change(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \ +) + /* * Has this transaction been streamed to downstream? * -- 1.8.3.1