On Tue, May 25, 2021 at 5:46 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> On Tue, May 25, 2021 at 4:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > Your patch will fix the reported scenario but I don't like the way
> > multi_insert flag is used to detect incomplete tuple. One problem
> > could be that even when there are no toast inserts, it won't allow to
> > stream unless we get the last tuple of multi insert WAL. How about
> > changing the code such that when we are clearing the toast flag, we
> > additionally check 'clear_toast_afterwards' flag?
>
> Yes, that can be done, I will fix this in the next version of the patch.

I have fixed as per the suggestion, and as per the offlist discussion,
I have merged the TOAST and SPEC insert flag and created a single
PARTIAL_CHANGE flag.
I have also added a test case for this.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 37bd2166bc5036613ec2666ac06ae8106170550b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 25 May 2021 14:51:45 +0530
Subject: [PATCH v2] Fix bug while streaming the multi-insert toast changes

While processing the multi-insert we can not clean the toast untill
we get the last insert of the multi-insert.  So mark the transaction
incomplete for streaming if we get any multi-insert change, and keep
it set until we get the last tuple of the multi-insert.
---
 contrib/test_decoding/expected/stream.out       | 41 +++++++++++++++++++++++++
 contrib/test_decoding/sql/stream.sql            |  5 +++
 src/backend/replication/logical/reorderbuffer.c | 36 ++++++++++------------
 src/include/replication/reorderbuffer.h         | 27 ++++------------
 4 files changed, 68 insertions(+), 41 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index e1c3bc8..1e93955 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -82,6 +82,47 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  committing streamed transaction
 (13 rows)
 
+-- streaming test for toast with multi-insert
+SELECT 'psql -At -c "copy stream_test to stdout" ' || current_database() AS copy_command \gset
+COPY stream_test FROM program :'copy_command';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                   data                   
+------------------------------------------
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(33 rows)
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index ce86c81..cad028b 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -26,5 +26,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
+-- streaming test for toast with multi-insert
+SELECT 'psql -At -c "copy stream_test to stdout" ' || current_database() AS copy_command \gset
+COPY stream_test FROM program :'copy_command';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b0ab91c..4401608 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -705,31 +705,27 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		toptxn = txn;
 
 	/*
-	 * Set the toast insert bit whenever we get toast insert to indicate a
-	 * partial change and clear it when we get the insert or update on main
-	 * table (Both update and insert will do the insert in the toast table).
+	 * If this is a toast change then set a bit to indicate a partial change
+	 * and clear it when we get the insert or update on main table (Both update
+	 * and insert will do the insert in the toast table) and
+	 * clear_toast_afterwards is set.
 	 */
 	if (toast_insert)
-		toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
-	else if (rbtxn_has_toast_insert(toptxn) &&
-			 IsInsertOrUpdate(change->action))
-		toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
+		toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
+	else if (rbtxn_has_partial_change(toptxn) &&
+			 IsInsertOrUpdate(change->action) &&
+			 change->data.tp.clear_toast_afterwards)
+		toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
 
 	/*
-	 * Set the spec insert bit whenever we get the speculative insert to
+	 * Set the partial change bit whenever we get the speculative insert to
 	 * indicate the partial change and clear the same on speculative confirm.
 	 */
 	if (IsSpecInsert(change->action))
-		toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
-	else if (IsSpecConfirm(change->action))
-	{
-		/*
-		 * Speculative confirm change must be preceded by speculative
-		 * insertion.
-		 */
-		Assert(rbtxn_has_spec_insert(toptxn));
-		toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
-	}
+		toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
+	else if (rbtxn_has_partial_change(toptxn) &&
+			 IsSpecConfirm(change->action))
+		toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
 
 	/*
 	 * Stream the transaction if it is serialized before and the changes are
@@ -741,7 +737,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 * changes.  Delaying such transactions would increase apply lag for them.
 	 */
 	if (ReorderBufferCanStartStreaming(rb) &&
-		!(rbtxn_has_incomplete_tuple(toptxn)) &&
+		!(rbtxn_has_partial_change(toptxn)) &&
 		rbtxn_is_serialized(txn))
 		ReorderBufferStreamTXN(rb, toptxn);
 }
@@ -3399,7 +3395,7 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
 		Assert(txn->base_snapshot != NULL);
 
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
+			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 53cdfa5..ced9f3e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -172,10 +172,9 @@ typedef struct ReorderBufferChange
 #define RBTXN_IS_SERIALIZED       0x0004
 #define RBTXN_IS_SERIALIZED_CLEAR 0x0008
 #define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_TOAST_INSERT    0x0020
-#define RBTXN_HAS_SPEC_INSERT     0x0040
-#define RBTXN_PREPARE             0x0080
-#define RBTXN_SKIPPED_PREPARE	  0x0100
+#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
+#define RBTXN_PREPARE             0x0040
+#define RBTXN_SKIPPED_PREPARE	  0x0080
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -201,24 +200,10 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
 )
 
-/* This transaction's changes has toast insert, without main table insert. */
-#define rbtxn_has_toast_insert(txn) \
+/* This transaction's changes has partial changes? */
+#define rbtxn_has_partial_change(txn) \
 ( \
-	((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
-)
-/*
- * This transaction's changes has speculative insert, without speculative
- * confirm.
- */
-#define rbtxn_has_spec_insert(txn) \
-( \
-	((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
-)
-
-/* Check whether this transaction has an incomplete change. */
-#define rbtxn_has_incomplete_tuple(txn) \
-( \
-	rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
+	((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
 /*
-- 
1.8.3.1

Reply via email to