On Wed, 28 Jun 2023 at 19:26, Ashutosh Bapat <ashutosh.bapat....@gmail.com> wrote: > > Hi Vignesh, > Thanks for working on this. > > On Wed, Jun 28, 2023 at 4:52 PM vignesh C <vignes...@gmail.com> wrote: > > > > Here is a patch having the fix for the same. I have not added any > > tests as the existing tests cover this scenario. The same issue is > > present in back branches too. > > Interesting, we have a test for this scenario and it accepts erroneous > output :). > > > v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_master.patch > > can be applied on master, PG15 and PG14, > > v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_PG13.patch > > patch can be applied on PG13, PG12 and PG11. > > Thoughts? > > I noticed this when looking at Tomas's patches for logical decoding of > sequences. The code block you have added is repeated in > pg_decode_change() and pg_decode_truncate(). It might be better to > push the conditions in pg_output_begin() itself so that any future > callsite of pg_output_begin() automatically takes care of these > conditions.
Thanks for the comments, here is an updated patch handling the above issue. Regards, Vignesh
From 130715a6dd808ed88af8c894d7e27b8637bc2c3a Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Wed, 28 Jun 2023 14:01:22 +0530 Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 +++++-- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 31 +++++++++++++-------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e..86c29de40b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -211,15 +211,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) TestDecodingData *data = ctx->output_plugin_private; data->xact_wrote_changes = false; - if (data->skip_empty_xacts) - return; pg_output_begin(ctx, data, txn, true); } static void -pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, + ReorderBufferTXN *txn, bool last_write) { + /* + * If asked to skip empty transactions, we'll emit BEGIN at the point + * where the first operation is received for this transaction. + */ + if (!(last_write ^ data->skip_empty_xacts) || data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); @@ -403,10 +409,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, data = ctx->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); data->xact_wrote_changes = true; class_form = RelationGetForm(relation); @@ -487,10 +490,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, data = ctx->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); data->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ @@ -534,6 +534,15 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (transactional) + { + pg_output_begin(ctx, data, txn, false); + data->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1
From 385e5bee5b42fa62d66fea9af7676a48993e0118 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Thu, 29 Jun 2023 08:35:09 +0530 Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 ++++-- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 37 +++++++++++++-------- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index de1b692658..6c32ca6558 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -299,15 +299,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) txndata->xact_wrote_changes = false; txn->output_plugin_private = txndata; - if (data->skip_empty_xacts) - return; - pg_output_begin(ctx, data, txn, true); } static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { + TestDecodingTxnData *txndata = txn->output_plugin_private; + + /* + * If asked to skip empty transactions, we'll emit BEGIN at the point + * where the first operation is received for this transaction. + */ + if (!(last_write ^ data->skip_empty_xacts) || txndata->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); @@ -355,9 +361,6 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) txndata->xact_wrote_changes = false; txn->output_plugin_private = txndata; - if (data->skip_empty_xacts) - return; - pg_output_begin(ctx, data, txn, true); } @@ -604,10 +607,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !txndata->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); @@ -690,10 +690,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !txndata->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ @@ -737,6 +734,18 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata; + + txndata = transactional ? txn->output_plugin_private : NULL; + + /* output BEGIN if we haven't yet */ + if (transactional) + { + pg_output_begin(ctx, data, txn, false); + txndata->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1
From b4543ffb251c99a3f0d4d6b807690c4b89f87a80 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Thu, 29 Jun 2023 09:19:11 +0530 Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 ++++++-- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 26 +++++++++++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a1a7c2ae0c..6bd26a68b7 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -204,15 +204,21 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) TestDecodingData *data = ctx->output_plugin_private; data->xact_wrote_changes = false; - if (data->skip_empty_xacts) - return; pg_output_begin(ctx, data, txn, true); } static void -pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, + ReorderBufferTXN *txn, bool last_write) { + /* + * If asked to skip empty transactions, we'll emit BEGIN at the point + * where the first operation is received for this transaction. + */ + if (!(last_write ^ data->skip_empty_xacts) || data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); @@ -403,10 +409,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, data = ctx->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); data->xact_wrote_changes = true; class_form = RelationGetForm(relation); @@ -481,6 +484,15 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (transactional) + { + pg_output_begin(ctx, data, txn, false); + data->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1
From 1458a65217322b95541e11134bc0e342ccdaa3bd Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Wed, 28 Jun 2023 14:01:22 +0530 Subject: [PATCH v2] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 ++++- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 46 +++++++++++---------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 12d1d0505d..b1ea2cecbf 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -293,19 +293,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) txndata->xact_wrote_changes = false; txn->output_plugin_private = txndata; + pg_output_begin(ctx, data, txn, true); +} + +static void +pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, + ReorderBufferTXN *txn, bool last_write) +{ + TestDecodingTxnData *txndata = txn->output_plugin_private; + /* * If asked to skip empty transactions, we'll emit BEGIN at the point * where the first operation is received for this transaction. */ - if (data->skip_empty_xacts) + if (!(last_write ^ data->skip_empty_xacts) || txndata->xact_wrote_changes) return; - pg_output_begin(ctx, data, txn, true); -} - -static void -pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) -{ OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); @@ -353,13 +356,6 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) txndata->xact_wrote_changes = false; txn->output_plugin_private = txndata; - /* - * If asked to skip empty transactions, we'll emit BEGIN at the point - * where the first operation is received for this transaction. - */ - if (data->skip_empty_xacts) - return; - pg_output_begin(ctx, data, txn, true); } @@ -610,10 +606,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !txndata->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); @@ -696,10 +689,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !txndata->xact_wrote_changes) - { - pg_output_begin(ctx, data, txn, false); - } + pg_output_begin(ctx, data, txn, false); txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ @@ -743,6 +733,18 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata; + + txndata = transactional ? txn->output_plugin_private : NULL; + + /* output BEGIN if we haven't yet */ + if (transactional) + { + pg_output_begin(ctx, data, txn, false); + txndata->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1