From 0b42953b81592de97c04cc4cedbda1a5f541266b Mon Sep 17 00:00:00 2001
From: Shi Yu <shiy.fnst@fujitsu.com>
Date: Mon, 24 Oct 2022 15:52:14 +0800
Subject: [PATCH v2] Allow streaming every change without waiting till
 logical_decoding_work_mem.

Add a new GUC force_stream_mode, when it is set on, send the change to
output plugin immediately in streaming mode. Otherwise, send until
logical_decoding_work_mem is exceeded.
---
 contrib/test_decoding/expected/stream.out     | 13 +++
 contrib/test_decoding/sql/stream.sql          |  8 ++
 doc/src/sgml/config.sgml                      | 16 ++++
 .../replication/logical/reorderbuffer.c       | 82 ++++++++++++-------
 src/backend/utils/misc/guc_tables.c           | 10 +++
 src/include/replication/reorderbuffer.h       |  1 +
 6 files changed, 100 insertions(+), 30 deletions(-)

diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0..b0bb467b2b 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -106,6 +106,19 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  committing streamed transaction
 (17 rows)
 
+-- streaming test with force_stream_mode
+SET force_stream_mode=on;
+TRUNCATE table stream_test;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                   data                   
+------------------------------------------
+ opening a streamed block for transaction
+ streaming truncate for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(4 rows)
+
+RESET force_stream_mode;
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 4feec62972..a5a63e71fa 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -44,5 +44,13 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
+-- streaming test with force_stream_mode
+SET force_stream_mode=on;
+
+TRUNCATE table stream_test;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+RESET force_stream_mode;
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8e4145979d..a4654f232e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11574,6 +11574,22 @@ LOG:  CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-force-stream-mode" xreflabel="force_stream_mode">
+      <term><varname>force_stream_mode</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>force_stream_mode</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies whether to force sending the changes to output plugin
+        immediately in streaming mode. If set to <literal>off</literal> (the
+        default), send until <varname>logical_decoding_work_mem</varname> is
+        exceeded.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
   </sect1>
   <sect1 id="runtime-config-short">
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b567b8b59e..d676aca529 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -209,6 +209,12 @@ typedef struct ReorderBufferDiskChange
 int			logical_decoding_work_mem;
 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 
+/*
+ * Whether to send the change to output plugin immediately in streaming mode.
+ * When it is off, wait until logical_decoding_work_mem is exceeded.
+ */
+bool		force_stream_mode;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -3540,7 +3546,9 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
- * disk until we reach under the memory limit.
+ * disk or send to output plugin until we reach under the memory limit.
+ *
+ * If force_stream_mode is enabled, send all streamable changes.
  *
  * XXX At this point we select the transactions until we reach under the memory
  * limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,50 +3560,64 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
 
-	/* bail out if we haven't exceeded the memory limit */
-	if (rb->size < logical_decoding_work_mem * 1024L)
-		return;
-
 	/*
-	 * Loop until we reach under the memory limit.  One might think that just
-	 * by evicting the largest (sub)transaction we will come under the memory
-	 * limit based on assumption that the selected transaction is at least as
-	 * large as the most recent change (which caused us to go over the memory
-	 * limit). However, that is not true because a user can reduce the
-	 * logical_decoding_work_mem to a smaller value before the most recent
-	 * change.
+	 * If possible, evict streamable transactions from memory by streaming.
 	 */
-	while (rb->size >= logical_decoding_work_mem * 1024L)
+	if (ReorderBufferCanStartStreaming(rb))
 	{
 		/*
-		 * Pick the largest transaction (or subtransaction) and evict it from
-		 * memory by streaming, if possible.  Otherwise, spill to disk.
+		 * If force_stream_mode is off, loop until we reach under the memory
+		 * limit. Otherwise loop until there's no streamable top transaction.
+		 * One might think that just by evicting the largest transaction we
+		 * will come under the memory limit based on assumption that the
+		 * selected transaction is at least as large as the most recent change
+		 * (which caused us to go over the memory limit). However, that is not
+		 * true because a user can reduce the logical_decoding_work_mem to a
+		 * smaller value or enable force_stream_mode before the most recent
+		 * change.
 		 */
-		if (ReorderBufferCanStartStreaming(rb) &&
-			(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
+		while (rb->size >= logical_decoding_work_mem * 1024L ||
+			   (force_stream_mode && rb->size > 0))
 		{
-			/* we know there has to be one, because the size is not zero */
+			txn = ReorderBufferLargestStreamableTopTXN(rb);
+			if (txn == NULL)
+				break;
+
 			Assert(txn && !txn->toptxn);
 			Assert(txn->total_size > 0);
 			Assert(rb->size >= txn->total_size);
 
 			ReorderBufferStreamTXN(rb, txn);
-		}
-		else
-		{
+
 			/*
-			 * Pick the largest transaction (or subtransaction) and evict it
-			 * from memory by serializing it to disk.
+			 * After eviction, the transaction should have no entries in memory,
+			 * and should use 0 bytes for changes.
 			 */
-			txn = ReorderBufferLargestTXN(rb);
+			Assert(txn->size == 0);
+			Assert(txn->nentries_mem == 0);
+		}
+	}
+
+	/*
+	 * Evict transactions from memory by serializing to disk. Loop until we
+	 * reach under the memory limit. One might think that just by evicting the
+	 * largest (sub)transaction we will come under the memory limit, but this is
+	 * not sufficient. See comments above.
+	 */
+	while (rb->size >= logical_decoding_work_mem * 1024L)
+	{
+		/*
+		 * Pick the largest transaction (or subtransaction) and evict it
+		 * from memory by serializing it to disk.
+		 */
+		txn = ReorderBufferLargestTXN(rb);
 
-			/* we know there has to be one, because the size is not zero */
-			Assert(txn);
-			Assert(txn->size > 0);
-			Assert(rb->size >= txn->size);
+		/* we know there has to be one, because the size is not zero */
+		Assert(txn);
+		Assert(txn->size > 0);
+		Assert(rb->size >= txn->size);
 
-			ReorderBufferSerializeTXN(rb, txn);
-		}
+		ReorderBufferSerializeTXN(rb, txn);
 
 		/*
 		 * After eviction, the transaction should have no entries in memory,
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1bf14eec66..015a88abb3 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -1247,6 +1247,16 @@ struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"force_stream_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("Force sending the changes to output plugin immediately if streaming is supported, without waiting till logical_decoding_work_mem."),
+			NULL,
+			GUC_NOT_IN_SAMPLE
+		},
+		&force_stream_mode,
+		false,
+		NULL, NULL, NULL
+	},
 
 	{
 		{"log_duration", PGC_SUSET, LOGGING_WHAT,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index c700b55b1c..c7c950b8ac 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -18,6 +18,7 @@
 #include "utils/timestamp.h"
 
 extern PGDLLIMPORT int logical_decoding_work_mem;
+extern PGDLLIMPORT bool force_stream_mode;
 
 /* an individual tuple, stored in one chunk of memory */
 typedef struct ReorderBufferTupleBuf
-- 
2.24.0.windows.2

