On Thu, Apr 15, 2021 at 4:39 PM Ajin Cherian <itsa...@gmail.com> wrote: > > > > On Thu, Apr 15, 2021 at 1:29 PM Ajin Cherian <itsa...@gmail.com> wrote: >> >> >> I've rebased the patch and made changes so that the patch supports >> "streaming in-progress transactions" and handling of logical decoding >> messages (transactional and non-transactional). >> I see that this patch not only makes sure that empty transactions are not >> sent but also does call OutputPluginUpdateProgress when an empty >> transaction is not sent, as a result the confirmed_flush_lsn is kept moving. >> I also see no hangs when synchronous_standby is configured. >> Do let me know your thoughts on this patch.
REVIEW COMMENTS I applied this patch to today's HEAD and successfully ran "make check" and also the subscription TAP tests. Here are a some review comments: ------ 1. The patch v3 applied OK but with whitespace warnings [postgres@CentOS7-x64 oss_postgres_2PC]$ git apply ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98: indent with spaces. /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99: indent with spaces. if (!data->xact_wrote_changes && !in_streaming && transactional) warning: 2 lines add whitespace errors. ------ 2. Please create a CF entry in [1] for this patch. ------ 3. Patch comment The comment describes the problem and then suddenly just says "Postpone the BEGIN message until the first change." I suggest changing it to say more like... "(blank line) This patch addresses the above problem by postponing the BEGIN message until the first change." ------ 4. pgoutput.h Maybe for consistency with the context member, the comment for the new member should be to the right instead of above it? @@ -20,6 +20,9 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ + /* flag indicating whether messages have previously been sent */ + bool xact_wrote_changes; + ------ 5. pgoutput.h + /* flag indicating whether messages have previously been sent */ "previously been sent" --> "already been sent" ?? ------ 6. pgoutput.h - misleading member name Actually, now that I have read all the rest of the code and how this member is used I feel that this name is very misleading. e.g. For "streaming" case then you still are writing changes but are not setting this member at all - therefore it does not always mean what it says. I feel a better name for this would be something like "sent_begin_txn". Then if you have sent BEGIN it is true. If you haven't sent BEGIN it is false. It eliminates all ambiguity naming it this way instead. (This makes my feedback #5 redundant because the comment will be a bit different if you do this). ------ 7. pgoutput.c - function pgoutput_begin_txn @@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { I guess that you still needed to pass the txn because that is how the API is documented, right? But I am wondering if you ought to flag it as unused so you wont get some BF machine giving warnings about it. e.g. Syntax like this? pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) { (void)txn; ... ------ 8. pgoutput.c - function pgoutput_begin_txn @@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = ctx->output_plugin_private; + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->xact_wrote_changes = false; + elog(LOG,"Holding of begin"); +} Why is this loglevel LOG? Looks like leftover debugging. ------ 9. pgoutput.c - function pgoutput_commit_txn @@ -384,8 +401,14 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data = ctx->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (!data->xact_wrote_changes) + return; + In the case where you decided to do nothing does it make sense that you still called the function OutputPluginUpdateProgress(ctx); ? I thought perhaps that your new check should come first so this call would never happen. ------ 10. pgoutput.c - variable declarations without casts + PGOutputData *data = ctx->output_plugin_private; I noticed the new stack variable you declare have no casts. This differs from the existing code which always looks like: PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; There are a couple of examples of this so please search new code to find them all. ------ 11. pgoutput.c - function pgoutput_change @@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes && !in_streaming) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } If the variable is renamed as previously suggested then the assignment data->sent_BEGIN_txn = true; can be assigned in just 1 common place INSIDE the pgoutput_begin function. ------ 12. pgoutput.c - pgoutput_truncate function @@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes && !in_streaming) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } (same comment as above) If the variable is renamed as previously suggested then the assignment data->sent_BEGIN_txn = true; can be assigned in just 1 common place INSIDE the pgoutput_begin function. 13. pgoutput.c - pgoutput_message @@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + if (!data->xact_wrote_changes && !in_streaming && transactional) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } (same comment as above) If the variable is renamed as previously suggested then the assignment data->sent_BEGIN_txn = true; can be assigned in just 1 common place INSIDE the pgoutput_begin function. ------ 14. Test Code. I noticed that there is no test code specifically for seeing if empty transactions get sent or not. Is it possible to write such a test or is this traffic improvement only observable using the debugger? ------ [1] https://commitfest.postgresql.org/33/ Kind Regards, Peter Smith. Fujitsu Australia