On Wed, 28 Jun 2023 at 19:26, Ashutosh Bapat
<ashutosh.bapat....@gmail.com> wrote:
>
> Hi Vignesh,
> Thanks for working on this.
>
> On Wed, Jun 28, 2023 at 4:52 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > Here is a patch having the fix for the same. I have not added any
> > tests as the existing tests cover this scenario. The same issue is
> > present in back branches too.
>
> Interesting, we have a test for this scenario and it accepts erroneous
> output :).
>
> > v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_master.patch
> > can be applied on master, PG15 and PG14,
> > v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_PG13.patch
> > patch can be applied on PG13, PG12 and PG11.
> > Thoughts?
>
> I noticed this when looking at Tomas's patches for logical decoding of
> sequences. The code block you have added is repeated in
> pg_decode_change() and pg_decode_truncate(). It might be better to
> push the conditions in pg_output_begin() itself so that any future
> callsite of pg_output_begin() automatically takes care of these
> conditions.

Thanks for the comments, here is an updated patch handling the above issue.

Regards,
Vignesh
From 130715a6dd808ed88af8c894d7e27b8637bc2c3a Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 28 Jun 2023 14:01:22 +0530
Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the
 first change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 +++++--
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       | 31 +++++++++++++--------
 3 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 93c948856e..86c29de40b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -211,15 +211,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	TestDecodingData *data = ctx->output_plugin_private;
 
 	data->xact_wrote_changes = false;
-	if (data->skip_empty_xacts)
-		return;
 
 	pg_output_begin(ctx, data, txn, true);
 }
 
 static void
-pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data,
+				ReorderBufferTXN *txn, bool last_write)
 {
+	/*
+	 * If asked to skip empty transactions, we'll emit BEGIN at the point
+	 * where the first operation is received for this transaction.
+	 */
+	if (!(last_write ^ data->skip_empty_xacts) || data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, last_write);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
@@ -403,10 +409,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	data = ctx->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	data->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
@@ -487,10 +490,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	data = ctx->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	data->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
@@ -534,6 +534,15 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional)
+	{
+		pg_output_begin(ctx, data, txn, false);
+		data->xact_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

From 385e5bee5b42fa62d66fea9af7676a48993e0118 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Thu, 29 Jun 2023 08:35:09 +0530
Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the 
 first change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 ++++--
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       | 37 +++++++++++++--------
 3 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index de1b692658..6c32ca6558 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -299,15 +299,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	txndata->xact_wrote_changes = false;
 	txn->output_plugin_private = txndata;
 
-	if (data->skip_empty_xacts)
-		return;
-
 	pg_output_begin(ctx, data, txn, true);
 }
 
 static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+	/*
+	 * If asked to skip empty transactions, we'll emit BEGIN at the point
+	 * where the first operation is received for this transaction.
+	 */
+	if (!(last_write ^ data->skip_empty_xacts) || txndata->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, last_write);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
@@ -355,9 +361,6 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	txndata->xact_wrote_changes = false;
 	txn->output_plugin_private = txndata;
 
-	if (data->skip_empty_xacts)
-		return;
-
 	pg_output_begin(ctx, data, txn, true);
 }
 
@@ -604,10 +607,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
@@ -690,10 +690,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
@@ -737,6 +734,18 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata;
+
+	txndata = transactional ? txn->output_plugin_private : NULL;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional)
+	{
+		pg_output_begin(ctx, data, txn, false);
+		txndata->xact_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

From b4543ffb251c99a3f0d4d6b807690c4b89f87a80 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Thu, 29 Jun 2023 09:19:11 +0530
Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the 
 first change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 ++++++--
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       | 26 +++++++++++++++------
 3 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index a1a7c2ae0c..6bd26a68b7 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -204,15 +204,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	TestDecodingData *data = ctx->output_plugin_private;
 
 	data->xact_wrote_changes = false;
-	if (data->skip_empty_xacts)
-		return;
 
 	pg_output_begin(ctx, data, txn, true);
 }
 
 static void
-pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data,
+				ReorderBufferTXN *txn, bool last_write)
 {
+	/*
+	 * If asked to skip empty transactions, we'll emit BEGIN at the point
+	 * where the first operation is received for this transaction.
+	 */
+	if (!(last_write ^ data->skip_empty_xacts) || data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, last_write);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
@@ -403,10 +409,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	data = ctx->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	data->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
@@ -481,6 +484,15 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional)
+	{
+		pg_output_begin(ctx, data, txn, false);
+		data->xact_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

From 1458a65217322b95541e11134bc0e342ccdaa3bd Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 28 Jun 2023 14:01:22 +0530
Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the
 first change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 ++++-
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       | 46 +++++++++++----------
 3 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 12d1d0505d..b1ea2cecbf 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -293,19 +293,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	txndata->xact_wrote_changes = false;
 	txn->output_plugin_private = txndata;
 
+	pg_output_begin(ctx, data, txn, true);
+}
+
+static void
+pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data,
+				ReorderBufferTXN *txn, bool last_write)
+{
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+
 	/*
 	 * If asked to skip empty transactions, we'll emit BEGIN at the point
 	 * where the first operation is received for this transaction.
 	 */
-	if (data->skip_empty_xacts)
+	if (!(last_write ^ data->skip_empty_xacts) || txndata->xact_wrote_changes)
 		return;
 
-	pg_output_begin(ctx, data, txn, true);
-}
-
-static void
-pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
-{
 	OutputPluginPrepareWrite(ctx, last_write);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
@@ -353,13 +356,6 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	txndata->xact_wrote_changes = false;
 	txn->output_plugin_private = txndata;
 
-	/*
-	 * If asked to skip empty transactions, we'll emit BEGIN at the point
-	 * where the first operation is received for this transaction.
-	 */
-	if (data->skip_empty_xacts)
-		return;
-
 	pg_output_begin(ctx, data, txn, true);
 }
 
@@ -610,10 +606,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
@@ -696,10 +689,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
-	{
-		pg_output_begin(ctx, data, txn, false);
-	}
+	pg_output_begin(ctx, data, txn, false);
 	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
@@ -743,6 +733,18 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata;
+
+	txndata = transactional ? txn->output_plugin_private : NULL;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional)
+	{
+		pg_output_begin(ctx, data, txn, false);
+		txndata->xact_wrote_changes = true;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

Reply via email to