On Sat, Nov 26, 2022 at 12:15 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Fri, Nov 25, 2022 at 5:38 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > On Fri, Nov 25, 2022 at 1:35 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
> > >
> > > During DecodeCommit() for skipping a transaction we use ReadRecPtr to
> > > check whether to skip this transaction or not.  Whereas in
> > > ReorderBufferCanStartStreaming() we use EndRecPtr to check whether to
> > > stream or not. Generally it will not create a problem but if the
> > > commit record itself is adding some changes to the transaction(e.g.
> > > snapshot) and if the "start_decoding_at" is in between ReadRecPtr and
> > > EndRecPtr then streaming will decide to stream the transaction where
> > > as DecodeCommit will decide to skip it.  And for handling this case in
> > > ReorderBufferForget() we call stream_abort().
> > >
> >
> > The other cases are probably where we don't have FilterByOrigin or
> > dbid check, for example, XLOG_HEAP2_NEW_CID/XLOG_XACT_INVALIDATIONS.
> > We anyway actually don't send anything for such cases except empty
> > start/stop messages. Can we add some flag to txn which says that there
> > is at least one change like DML that we want to stream?
> >
>
> We can probably think of using txn_flags for this purpose.

In the attached patch I have used txn_flags to identify whether it has
any streamable change or not and the transaction will not be selected
for streaming unless it has at least one streamable change.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From f330d37f6ac1930cde1d2773dcd568b9a35454c9 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.ku...@enterprisedb.com>
Date: Fri, 25 Nov 2022 13:11:44 +0530
Subject: [PATCH v2] Fix thinko in when to stream a transaction

Actually, during DecodeCommit() for skipping a transaction we use
ReadRecPtr to check whether to skip this transaction or not.  Whereas
in ReorderBufferCanStartStreaming() we use EndRecPtr to check whether
to stream or not. Generally it will not create a problem but if the
commit record itslef is adding some changes to the transaction(e.g. snapshot)
and if the start_decoding_at is in between ReadRecPtr and EndRecPtr then
streaming will decide to stream the transaction where as DecodeCommit will
decide to skip it.  And for handling this case in ReorderBufferForget() we
call stream_abort() in order to abort any streamed changes.  So ideally if
we are planning to skip the transaction we should never stream it hence there
is no need to stream abort such transaction in case of skip.

Along with that we also skip the transaction if the transaction dbid is not same
slot dbid or it is filtered by origin id.  So in corner cases it is possible that
we might stream the transaction but later it will be skipped in DecodeCommit.

For fixing that do not select any transaction for streaming unless there is
any streamable change and if there is any streamable change then we can safely
select it for streaming as it will not be skipped by DecodeCommit.
---
 src/backend/replication/logical/reorderbuffer.c | 34 +++++++++++++++++++++----
 src/include/replication/reorderbuffer.h         | 23 +++++++++++------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 31f7381..e1a031d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -793,6 +793,30 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 		return;
 	}
 
+	/*
+	 * If there are any streamable changes getting queued then get the top
+	 * transaction and mark it has streamable change.  This is required for
+	 * streaming in-progress transactions, the in-progress transaction will
+	 * not be selected for streaming unless it has at least one streamable
+	 * change.
+	 */
+	if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+		change->action == REORDER_BUFFER_CHANGE_DELETE ||
+		change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+		change->action == REORDER_BUFFER_CHANGE_TRUNCATE)
+	{
+		ReorderBufferTXN *toptxn;
+
+		/* get the top transaction */
+		if (txn->toptxn != NULL)
+			toptxn = txn->toptxn;
+		else
+			toptxn = txn;
+
+		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+	}
+
 	change->lsn = lsn;
 	change->txn = txn;
 
@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
-	/* For streamed transactions notify the remote node about the abort. */
-	if (rbtxn_is_streamed(txn))
-		rb->stream_abort(rb, txn, lsn);
+	/* the transaction which is being skipped shouldn't have been streamed */
+	Assert(!rbtxn_is_streamed(txn));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
@@ -3502,7 +3525,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+			rbtxn_has_streamable_change(txn))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3919,7 +3943,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	 * restarting.
 	 */
 	if (ReorderBufferCanStream(rb) &&
-		!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+		!SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
 		return true;
 
 	return false;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b23d8cc..9766c9f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE	  0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 	0x0001
+#define RBTXN_IS_SUBXACT          	0x0002
+#define RBTXN_IS_SERIALIZED       	0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 	0x0008
+#define RBTXN_IS_STREAMED         	0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE  	0x0020
+#define RBTXN_PREPARE             	0x0040
+#define RBTXN_SKIPPED_PREPARE	  	0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Has this transaction contains streamable change? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

Reply via email to