From 09af8bb92e13d3f466e53eb58198f6c6a99d5ddd Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Wed, 16 Mar 2022 02:26:22 -0400
Subject: [PATCH v25 2/2] Skip empty streamed transactions for logical
 replication.

This patch postpones the START STREAM message while streaming large in-progress
transactions until the first change. While processing the STOP STREAM
message, if there was no other change for that transaction, do not send
the STOP STREAM message.
Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 191 ++++++++++++++++++++++++----
 1 file changed, 168 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b0a98f2..221ef8d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -67,6 +67,8 @@ static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 										   TimestampTz prepare_time);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
+static void pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+									   ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn);
 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
@@ -169,12 +171,15 @@ typedef struct RelationSyncEntry
 /*
  * Maintain a per-transaction level variable to track whether the
  * transaction has sent BEGIN. BEGIN is only sent when the first
- * change in a transaction is processed. This makes it possible
- * to skip transactions that are empty.
+ * change in a transaction is processed. Similarly while streaming
+ * transactions, STREAM_START is only sent with the first change.
+ * This makes it possible to skip transactions that are empty.
  */
 typedef struct PGOutputTxnData
 {
    bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
+   bool sent_stream_start; /* flag indicating if stream start has been sent */
+   bool sent_first_stream;   /* flag indicating if any stream has been sent */
 } PGOutputTxnData;
 
 /* Map used to remember which relation schemas we sent. */
@@ -661,9 +666,18 @@ maybe_send_schema(LogicalDecodingContext *ctx,
    /* set up txndata */
    txndata = toptxn->output_plugin_private;
 
-	/* Send BEGIN if we haven't yet */
-	if (txndata && !txndata->sent_begin_txn)
+	if (in_streaming)
+	{
+		/* If streaming, send STREAM START if we haven't yet */
+		if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, toptxn);
+	}
+	else
+	{
+		/* If not streaming, send BEGIN if we haven't yet */
+		if (txndata && !txndata->sent_begin_txn)
 			pgoutput_send_begin(ctx, toptxn);
+	}
 
 	/*
 	 * Send the schema.  If the changes will be published using an ancestor's
@@ -1288,9 +1302,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									 &action))
 				break;
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+			if (in_streaming)
+			{
+				/* If streaming, send STREAM START if we haven't yet */
+				if (txndata && !txndata->sent_stream_start)
+					pgoutput_send_stream_start(ctx, txn);
+			}
+			else
+			{
+				/* If not streaming, send BEGIN if we haven't yet */
+				if (txndata && !txndata->sent_begin_txn)
+					pgoutput_send_begin(ctx, txn);
+			}
 
 			/*
 			 * Schema should be sent using the original relation because it
@@ -1342,9 +1365,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									 relentry, &action))
 				break;
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+			if (in_streaming)
+			{
+				/* If streaming, send STREAM START if we haven't yet */
+				if (txndata && !txndata->sent_stream_start)
+					pgoutput_send_stream_start(ctx, txn);
+			}
+			else
+			{
+				/* If not streaming, send BEGIN if we haven't yet */
+				if (txndata && !txndata->sent_begin_txn)
+					pgoutput_send_begin(ctx, txn);
+			}
 
 			maybe_send_schema(ctx, change, relation, relentry);
 
@@ -1404,9 +1436,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 										 relentry, &action))
 					break;
 
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
+				if (in_streaming)
+				{
+					/* If streaming, send STREAM START if we haven't yet */
+					if (txndata && !txndata->sent_stream_start)
+						pgoutput_send_stream_start(ctx, txn);
+				}
+				else
+				{
+					/* If not streaming, send BEGIN if we haven't yet */
+					if (txndata && !txndata->sent_begin_txn)
+						pgoutput_send_begin(ctx, txn);
+				}
 
 				maybe_send_schema(ctx, change, relation, relentry);
 
@@ -1484,9 +1525,18 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		txndata = (PGOutputTxnData *) txn->output_plugin_private;
 
-		/* Send BEGIN if we haven't yet */
-		if (txndata && !txndata->sent_begin_txn)
+		if (in_streaming)
+		{
+			/* If streaming, send STREAM START if we haven't yet */
+			if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, txn);
+		}
+		else
+		{
+			/* If not streaming, send BEGIN if we haven't yet */
+			if (txndata && !txndata->sent_begin_txn)
 			pgoutput_send_begin(ctx, txn);
+		}
 
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
@@ -1524,13 +1574,22 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * Output BEGIN if we haven't yet.
 	 * Avoid for non-transactional messages.
 	 */
-	if (!in_streaming && transactional)
+	if (in_streaming || transactional)
 	{
 		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 
-		/* Send BEGIN if we haven't yet */
-		if (txndata && !txndata->sent_begin_txn)
+		if (in_streaming)
+		{
+			/* If streaming, send STREAM START if we haven't yet */
+			if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, txn);
+		}
+		else
+		{
+			/* If not streaming, send BEGIN if we haven't yet */
+			if (txndata && !txndata->sent_begin_txn)
 			pgoutput_send_begin(ctx, txn);
+		}
 	}
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -1615,28 +1674,60 @@ static void
 pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
-	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData *txndata = txn->output_plugin_private;
 
 	/* we can't nest streaming of transactions */
 	Assert(!in_streaming);
 
 	/*
+	 * Don't actually send stream start here, instead set a flag that indicates
+	 * that stream start hasn't been sent and wait for the first actual change
+	 * for this stream to be sent and then send stream start. This is done
+	 * to avoid sending empty streams without any changes.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->sent_stream_start = false;
+	in_streaming = true;
+}
+
+/*
+ * Actually send START STREAM
+ */
+static void
+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn)
+{
+	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+	/*
 	 * If we already sent the first stream for this transaction then don't
 	 * send the origin id in the subsequent streams.
 	 */
-	if (rbtxn_is_streamed(txn))
+	if (txndata->sent_first_stream)
 		send_replication_origin = false;
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
-	logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+	logicalrep_write_stream_start(ctx->out, txn->xid, !txndata->sent_first_stream);
 
 	send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
 					 send_replication_origin);
 
 	OutputPluginWrite(ctx, true);
 
-	/* we're streaming a chunk of transaction now */
-	in_streaming = true;
+	/*
+	 * Set the flags that indicate that changes were sent as part of
+	 * the transaction and the stream.
+	 */
+	txndata->sent_begin_txn = txndata->sent_stream_start = true;
+	txndata->sent_first_stream = true;
 }
 
 /*
@@ -1646,9 +1737,18 @@ static void
 pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
 					 ReorderBufferTXN *txn)
 {
+	PGOutputTxnData *data = txn->output_plugin_private;
+
 	/* we should be streaming a trasanction */
 	Assert(in_streaming);
 
+	if (!data->sent_stream_start)
+	{
+		in_streaming = false;
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream stop");
+		return;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_stop(ctx->out);
 	OutputPluginWrite(ctx, true);
@@ -1667,6 +1767,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 					  XLogRecPtr abort_lsn)
 {
 	ReorderBufferTXN *toptxn;
+	PGOutputTxnData  *txndata;
+	bool sent_first_stream;
 
 	/*
 	 * The abort should happen outside streaming block, even for streamed
@@ -1676,6 +1778,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 
 	/* determine the toplevel transaction */
 	toptxn = (txn->toptxn) ? txn->toptxn : txn;
+	txndata = toptxn->output_plugin_private;
+	sent_first_stream = txndata->sent_first_stream;
+
+	if (txn->toptxn == NULL)
+	{
+		pfree(txndata);
+		txn->output_plugin_private = NULL;
+	}
+
+	if (!sent_first_stream)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+		return;
+	}
 
 	Assert(rbtxn_is_streamed(toptxn));
 
@@ -1695,6 +1811,9 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
 					   XLogRecPtr commit_lsn)
 {
+	PGOutputTxnData *txndata = txn->output_plugin_private;
+	bool			sent_first_stream = txndata->sent_first_stream;
+
 	/*
 	 * The commit should happen outside streaming block, even for streamed
 	 * transactions. The transaction has to be marked as streamed, though.
@@ -1702,6 +1821,20 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+
+	/*
+	 * If no changes were part of this transaction then drop the commit
+	 * but send the update progress.
+	 */
+	if (!sent_first_stream)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+		OutputPluginUpdateProgress(ctx, true);
+		return;
+	}
+
 	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -1721,8 +1854,20 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 							ReorderBufferTXN *txn,
 							XLogRecPtr prepare_lsn)
 {
+	PGOutputTxnData *txndata = txn->output_plugin_private;
+	bool			sent_begin_txn = txndata->sent_begin_txn;
+
 	Assert(rbtxn_is_streamed(txn));
 
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+
+	if (!sent_begin_txn)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+		return;
+	}
+
 	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
-- 
1.8.3.1

