On Fri, Feb 25, 2022 at 9:17 PM Peter Smith <smithpb2...@gmail.com> wrote: > > Hi. Here are my review comments for the v19 patch. > > ====== > > 1. Commit message > > The current logical replication behavior is to send every transaction to > subscriber even though the transaction is empty (because it does not > contain changes from the selected publications). > > SUGGESTION > "to subscriber even though" --> "to the subscriber even if"
Fixed. > > ~~~ > > 2. Commit message > > This patch addresses the above problem by postponing the BEGIN message > until the first change. While processing a COMMIT message, > if there is no other change for that transaction, > do not send COMMIT message. It means that pgoutput will > skip BEGIN/COMMIT messages for transactions that are empty. > > SUGGESTION > "if there is" --> "if there was" > "do not send COMMIT message" --> "do not send the COMMIT message" > "It means that pgoutput" --> "This means that pgoutput" > > ~~~ Fixed. > > 3. Commit message > > Shouldn't there be some similar description about using a lazy send > mechanism for STREAM START? > > ~~~ Added. > > 4. src/backend/replication/pgoutput/pgoutput.c - typedef struct > PGOutputTxnData > > +/* > + * Maintain a per-transaction level variable to track whether the > + * transaction has sent BEGIN. BEGIN is only sent when the first > + * change in a transaction is processed. This makes it possible > + * to skip transactions that are empty. > + */ > +typedef struct PGOutputTxnData > +{ > + bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */ > + bool sent_stream_start; /* flag indicating if stream start has been sent > */ > + bool sent_any_stream; /* flag indicating if any stream has been sent */ > +} PGOutputTxnData; > + > > The struct comment looks stale because it doesn't mention anything > about the similar lazy send mechanism for STREAM_START. > > ~~~ Added. > > 5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn > > static void > pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > { > + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context, > + sizeof(PGOutputTxnData)); > + > + txndata->sent_begin_txn = false; > + txn->output_plugin_private = txndata; > +} > > You don’t need to assign the other members 'sent_stream_start', > 'sent_any_stream' because you are doing MemoryContextAllocZero anyway, > but for the same reason you did not really need to assign the > 'sent_begin_txn' flag either. > > I guess for consistency maybe it is better to (a) set all of them or > (b) set none of them. I prefer (b). > > ~~~ Did (b) > > 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin > > I feel the 'pgoutput_begin' function is not well named. It makes some > of the code where they are called look quite confusing. > > For streaming there is: > 1. pgoutput_stream_start (does not send) > 2. pgoutput_send_stream_start (does send) > so it is very clear. > > OTOH there are > 3. pgoutput_begin_txn (does not send) > 4. pgoutput_begin (does send) > > For consistency I think the 'pgoutput_begin' name should be changed to > include "send" verb > 1. pgoutput_begin_txn (does not send) > 2. pgoutput_send_begin_txn (does send) > > ~~~ Changed as mentioned. > > 7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema > > @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx, > if (schema_sent) > return; > > + /* set up txndata */ > + txndata = toptxn->output_plugin_private; > + > + /* > + * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE > + * is sent. If not, send now. > + */ > + if (in_streaming && !txndata->sent_stream_start) > + pgoutput_send_stream_start(ctx, toptxn); > + else if (txndata && !txndata->sent_begin_txn) > + { > + pgoutput_begin(ctx, toptxn); > + } > + > > How come the in_streaming case is not checking for a NULL txndata > before referencing it? Even if it is OK to do that, some more comments > or assertions might help for this piece of code. > (Stop-Press: see later comments #9, #10) > > ~~~ Updated. > > 8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema > > @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx, > if (schema_sent) > return; > > + /* set up txndata */ > + txndata = toptxn->output_plugin_private; > + > + /* > + * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE > + * is sent. If not, send now. > + */ > > What part of this code is doing anything about "BEGIN PREPARE" ? > > ~~~ Removed that reference. > > 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change > > @@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > Assert(false); > } > > + /* If streaming, send STREAM START if we haven't yet */ > + if (in_streaming && (txndata && !txndata->sent_stream_start)) > + pgoutput_send_stream_start(ctx, txn); > + /* > + * Output BEGIN if we haven't yet, unless streaming. > + */ > + else if (!in_streaming && (txndata && !txndata->sent_begin_txn)) > + pgoutput_begin(ctx, txn); > + > > The above code fragment looks more like what IU was expecting should > be in 'maybe_send_schema', > > If you expand it out (and tweak the comments) it can become much less > complex looking IMO > > e.g. > > if (in_streaming) > { > /* If streaming, send STREAM START if we haven't yet */ > if (txndata && !txndata->sent_stream_start) > pgoutput_send_stream_start(ctx, txn); > } > else > { > /* If not streaming, send BEGIN if we haven't yet */ > if (txndata && !txndata->sent_begin_txn) > pgoutput_begin(ctx, txn); > } > > Also, IIUC for the 'in_streaming' case you can Assert(txndata); so > then the code can be made even simpler. > Chose your example. > ~~~ > > 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate > > @ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > > if (nrelids > 0) > { > + txndata = (PGOutputTxnData *) txn->output_plugin_private; > + > + /* If streaming, send STREAM START if we haven't yet */ > + if (in_streaming && (txndata && !txndata->sent_stream_start)) > + pgoutput_send_stream_start(ctx, txn); > + /* > + * output BEGIN if we haven't yet, unless streaming. > + */ > + else if (!in_streaming && (txndata && !txndata->sent_begin_txn)) > + pgoutput_begin(ctx, txn); > > So now I have seen almost identical code repeated in 3 places so I am > beginning to think these should just be encapsulated in some common > function to call to do the deferred "send". Thoughts? > > ~~~ Not sure if we want to add a function call overhead. > > 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message > > @@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > if (in_streaming) > xid = txn->xid; > > + /* > + * Output BEGIN if we haven't yet. > + * Avoid for streaming and non-transactional messages. > + */ > + if (in_streaming || transactional) > + { > + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > + > + /* If streaming, send STREAM START if we haven't yet */ > + if (in_streaming && (txndata && !txndata->sent_stream_start)) > + pgoutput_send_stream_start(ctx, txn); > + else if (transactional) > + { > + if (txndata && !txndata->sent_begin_txn) > + pgoutput_begin(ctx, txn); > + } > + } > > Does that comment at the top of that code fragment accurately match > this code? It seemed a bit muddled/stale to me. > > ~~~ Fixed. > > 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start > > /* > + * Don't actually send stream start here, instead set a flag that indicates > + * that stream start hasn't been sent and wait for the first actual change > + * for this stream to be sent and then send stream start. This is done > + * to avoid sending empty streams without any changes. > + */ > + if (txndata == NULL) > + { > + txndata = > + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData)); > + txndata->sent_begin_txn = false; > + txndata->sent_any_stream = false; > + txn->output_plugin_private = txndata; > + } > > IMO there is no need to set the members – just let the > MemoryContextAllocZero take care of all that. Then the code is simpler > and it also saves wondering if anything was accidentally missed. > Fixed. > ~~~ > > 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start > > +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn) > +{ > + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; > + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > + > + > + /* > * If we already sent the first stream for this transaction then don't > * send the origin id in the subsequent streams. > */ > - if (rbtxn_is_streamed(txn)) > + if (txndata->sent_any_stream) > send_replication_origin = false; > > Given this usage, I wonder if there is a better name for the txndata > member - e.g. 'sent_first_stream' ? > > ~~~ Changed. > > 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start > > - /* we're streaming a chunk of transaction now */ > - in_streaming = true; > + /* > + * Set the flags that indicate that changes were sent as part of > + * the transaction and the stream. > + */ > + txndata->sent_begin_txn = txndata->sent_stream_start = true; > + txndata->sent_any_stream = true; > > Why is this setting member 'sent_begin_txn' true also? It seems odd to > say so because the BEGIN was not actually sent at all, right? > > ~~~ You can have transactions that are partially streamed and partially not. So if there is a transaction that started as streaming, but when it is committed, it is replicated as part of the commit, then when the changes are decoded, we shouldn't be sending a "begin" again. > > 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort > > @@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext > *ctx, > > /* determine the toplevel transaction */ > toptxn = (txn->toptxn) ? txn->toptxn : txn; > + txndata = toptxn->output_plugin_private; > + sent_begin_txn = txndata->sent_begin_txn; > + > + if (txn->toptxn == NULL) > + { > + pfree(txndata); > + txn->output_plugin_private = NULL; > + } > + > + if (!sent_begin_txn) > + { > + elog(DEBUG1, "Skipping replication of an empty transaction in stream > abort"); > + return; > + } > > I didn't really understand why this code is checking the > 'sent_begin_txn' member instead of the 'sent_stream_start' member? > Yes, changed this to check "sent_first_stream" > ~~~ > > 16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit > > @@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct > LogicalDecodingContext *ctx, > Assert(!in_streaming); > Assert(rbtxn_is_streamed(txn)); > > - OutputPluginUpdateProgress(ctx); > + pfree(txndata); > + txn->output_plugin_private = NULL; > + > + /* If no changes were part of this transaction then drop the commit */ > + if (!sent_begin_txn) > + { > + elog(DEBUG1, "Skipping replication of an empty transaction in stream > commit"); > + return; > + } > > (Same as previous comment #15). I didn't really understand why this > code is checking the 'sent_begin_txn' member instead of the > 'sent_stream_start' member? > > ~~~ Changed. > > 17. src/backend/replication/syncrep.c - SyncRepEnabled > > @@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void) > } > > /* > + * Check if synchronous replication is enabled > + */ > +bool > +SyncRepEnabled(void) > +{ > + return SyncRepRequested() && ((volatile WalSndCtlData *) > WalSndCtl)->sync_standbys_defined; > +} > > That code was once inline in 'SyncRepWaitForLSN' before it was turned > into a function, and there is a long comment in SyncRepWaitForLSN > describing the risks of this logic. e.g. > > <quote> > ... If it's true, we need to check it again > * later while holding the lock, to check the flag and operate the sync > * rep queue atomically. This is necessary to avoid the race condition > * described in SyncRepUpdateSyncStandbysDefined(). > </quote> > > This same function is now called from walsender.c. I think maybe it is > OK but please confirm it. > > Anyway, the point is maybe this SyncRepEnabled function should be > better commented to make some reference about the race concerns of the > original comment. Otherwise some future caller of this function may be > unaware of it and come to grief. > Leaving this for now, not sure what wording is appropriate to use here. On Wed, Feb 23, 2022 at 5:24 PM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsa...@gmail.com> wrote: > > > Few comments to V19-0001: > > 1. I think we should adjust the alignment format. > git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch > .git/rebase-apply/patch:197: indent with spaces. > * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE > .git/rebase-apply/patch:198: indent with spaces. > * is sent. If not, send now. > .git/rebase-apply/patch:199: indent with spaces. > */ > .git/rebase-apply/patch:201: indent with spaces. > pgoutput_send_stream_start(ctx, toptxn); > .git/rebase-apply/patch:204: indent with spaces. > pgoutput_begin(ctx, toptxn); > warning: 5 lines add whitespace errors. Fixed. > > 2. Structure member initialization. > static void > pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > { > + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context, > + > sizeof(PGOutputTxnData)); > + > + txndata->sent_begin_txn = false; > + txn->output_plugin_private = txndata; > +} > Do we need to set sent_stream_start and sent_any_stream to false here? Fixed > > 3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in > other functions. > > 4. In addition, I think we should keep a unified style. > a). log style (maybe first one is better.) > First style : "Skipping replication of an empty transaction in XXX" > Second style : "skipping replication of an empty transaction" > b) flag name (maybe second one is better.) > First style : variable "sent_begin_txn" in function pgoutput_stream_*. > Second style : variable "skip" in function pgoutput_commit_txn. > Fixed, Regards, Ajin Cherian Fujitsu Australia
v21-0001-Skip-empty-transactions-for-logical-replication.patch
Description: Binary data