From 11cf3de4232e02d38850aa3433cd5ec024f571a6 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 10 Nov 2020 14:07:30 +0530
Subject: [PATCH v1] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.  Along with that we also support
a new option to skip an empty streams.
---
 .../test_decoding/expected/concurrent_stream.out   |   5 +-
 contrib/test_decoding/specs/concurrent_stream.spec |   8 +-
 contrib/test_decoding/test_decoding.c              | 100 +++++++++++++++++----
 src/include/replication/reorderbuffer.h            |   5 ++
 4 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13..6f8b217 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9..8d24ca1 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -8,7 +8,7 @@ setup
 
   -- consume DDL
   SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 60000) g';
 }
 
 teardown
@@ -23,6 +23,10 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
 # streamed when user asked to skip-empty-xacts.
@@ -34,4 +38,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..921306c 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,16 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
+	bool		skip_empty_streams;	
 	bool		only_local;
 } TestDecodingData;
 
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -135,6 +141,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_xids = true;
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
+	data->skip_empty_streams = false;
 	data->only_local = false;
 
 	ctx->output_plugin_private = data;
@@ -194,6 +201,24 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
+
+			/* set skip empty stream to true if skip empty xact is set to true */
+			if (data->skip_empty_xacts)
+				data->skip_empty_streams = true;
+		}
+		else if (strcmp(elem->defname, "skip-empty-streams") == 0)
+		{
+			if (elem->arg == NULL)
+				data->skip_empty_streams = true;
+			else if (!parse_bool(strVal(elem->arg), &data->skip_empty_streams))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+			if (!data->skip_empty_xacts && data->skip_empty_streams)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("the skip-empty-streams can not be true if skip-empty-xacts is false")));
 		}
 		else if (strcmp(elem->defname, "only-local") == 0)
 		{
@@ -255,8 +280,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +309,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +472,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +559,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +626,26 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
-	if (data->skip_empty_xacts)
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
+	if (data->skip_empty_streams)
+	{
+		Assert(data->skip_empty_xacts);
 		return;
+	}
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +665,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -633,8 +684,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   XLogRecPtr abort_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = false;
+	}
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +712,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = false;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +747,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +801,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_streams && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

