On Mon, Jul 19, 2021 at 3:24 PM Peter Smith <smithpb2...@gmail.com> wrote:
> 1a. Commit Comment - wording > updated. > > 1b. Commit Comment - wording > updated. > 2. doc/src/sgml/logicaldecoding.sgml - wording > > @@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct > LogicalDecodingContext *ctx, > The required <function>commit_prepared_cb</function> callback is called > whenever a transaction <command>COMMIT PREPARED</command> has > been decoded. > The <parameter>gid</parameter> field, which is part of the > - <parameter>txn</parameter> parameter, can be used in this callback. > + <parameter>txn</parameter> parameter, can be used in this callback. The > + parameters <parameter>prepare_end_lsn</parameter> and > + <parameter>prepare_time</parameter> can be used to check if the plugin > + has received this <command>PREPARE TRANSACTION</command> in which case > + it can commit the transaction, otherwise, it can skip the commit. The > + <parameter>gid</parameter> alone is not sufficient because the > downstream > + node can have a prepared transaction with the same identifier. > > => > > (some minor rewording of the last part) updated. > > 3. src/backend/replication/logical/proto.c - whitespace > > @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in, > LogicalRepCommitPreparedTxnData * > elog(ERROR, "unrecognized flags %u in commit prepared message", flags); > > /* read fields */ > + prepare_data->prepare_end_lsn = pq_getmsgint64(in); > + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr) > + elog(ERROR,"prepare_end_lsn is not set in commit prepared message"); > > => > > There is missing space before the 2nd elog param. > fixed. > > 4a. => > > "and was essentially an empty prepare" --> "so was essentially an empty > prepare" > > 4b. => > > "In which case" --> "In this case" > > ------ fixed. > I felt that since this message postponement is now the new behaviour > of this function then probably this should all be a function level > comment instead of the comment being in the body of the function > > ------ > > 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin > > + > +static void > +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > > => > > Even though it is kind of obvious, it is probably better to provide a > function comment here too > > ------ Changed accordingly. > > I felt that the comment "skip COMMIT message if nothing was sent" > should be done at the point where you *decide* to skip or not. So you > could either move that comment to where the skip variable is assigned. > Or (my preference) leave the comment where it is but change the > variable name to be sent_begin = !data->sent_begin_txn; > Updated the comment to where the skip variable is assigned. > ------ > > Regardless I think the comment should be elaborated a bit to describe > the reason more. > > 7b. => > > BEFORE > /* skip COMMIT message if nothing was sent */ > > AFTER > /* If a BEGIN message was not yet sent, then it means there were no > relevant changes encountered, so we can skip the COMMIT message too. > */ > Updated accordingly. > ------ > Like previously, I felt that this big comment should be at the > function level of pgoutput_begin_prepare_txn instead of in the body of > the function. > > ------ > > 8b. => > > And then the body comment would be something simple like: > > /* Delegate to assign the begin sent flag as false same as for the > BEGIN message. */ > pgoutput_begin_txn(ctx, txn); > Updated accordingly. > ------ > > 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare > > + > +static void > +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > > => > > Probably this needs a function comment. > Updated. > ------ > > 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn > > @@ -459,8 +520,18 @@ static void > pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > XLogRecPtr prepare_lsn) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > + Assert(data); > OutputPluginUpdateProgress(ctx); > > + /* skip PREPARE message if nothing was sent */ > + if (!data->sent_begin_txn) > > => > > Maybe elaborate on that "skip PREPARE message if nothing was sent" > comment in a way similar to my review comment 7b. For example, > > AFTER > /* If the BEGIN was not yet sent, then it means there were no relevant > changes encountered, so we can skip the PREPARE message too. */ > Updated. > ------ > > 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn > > @@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext > *ctx, ReorderBufferTXN *txn, > */ > static void > pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > - XLogRecPtr commit_lsn) > + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, > + TimestampTz prepare_time) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > + /* > + * skip sending COMMIT PREPARED message if prepared transaction > + * has not been sent. > + */ > + if (data) > > => > > Similar to previous review comment 10, I think the reason for the skip > should be elaborated a little bit. For example, > > AFTER > /* If the BEGIN PREPARE was not yet sent, then it means there were no > relevant changes encountered, so we can skip the COMMIT PREPARED > message too. */ > > ------ Updated accordingly. > > 12. src/backend/replication/pgoutput/pgoutput.c - > pgoutput_rollback_prepared_txn > > => Similar as for pgoutput_comment_prepared_txn (see review comment 11) > > ------ Updated, > > 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change > > @@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > Relation relation, ReorderBufferChange *change) > { > PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > MemoryContext old; > RelationSyncEntry *relentry; > TransactionId xid = InvalidTransactionId; > Relation ancestor = NULL; > > + /* If not streaming, should have setup txndata as part of > BEGIN/BEGIN PREPARE */ > + if (!in_streaming) > + Assert(txndata); > + > if (!is_publishable_relation(relation)) > return; > > 13a. => > > I felt the streaming logic with the txndata is a bit confusing. I > think it would be easier to have another local variable (sent_begin) > and use it like this... > > bool sent_begin; > if (in_streaming) > { > sent_begin = true; > else > { > PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > Assert(txndata) > sent_begin = txn->sent_begin_txn; > } > I did not make the change, because in case of streaming "Sent_begin" is not true, so it seemed incorrect coding it that way. Instead , I have modified the comment to mention that streaming transaction do not send BEG / BEGIN PREPARE. > ... > > ------ > > + /* output BEGIN if we haven't yet */ > > 13b. => > > I thought the comment is not quite right > > AFTER > /* Output BEGIN / BEGIN PREPARE if we haven't yet */ > > ------ Updated. > > + if (!in_streaming && !txndata->sent_begin_txn) > + { > + if (rbtxn_prepared(txn)) > + pgoutput_begin_prepare(ctx, txn); > + else > + pgoutput_begin(ctx, txn); > + } > + > > 13.c => > > If you introduce the variable (as suggested in 13a) this code becomes > much simpler: > Skipped this. (reason mentioned above) > ------ > > 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate > > => > > All the similar review comments made for pg_change (13a, 13b, 13c) > apply to pgoutput_truncate here also. > > ------ Updated. > > 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message > > @@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > const char *message) > { > PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + PGOutputTxnData *txndata; > TransactionId xid = InvalidTransactionId; > > > => > > This variable should be declared in the block where it is used, > similar to the suggestion 13a. > > Also is it just an accidental omission that you did Assert(txndata) > for all the other places but not here? > Moved location of the variable and added an assert. regards, Ajin Cherian Fujitsu Australia
v8-0001-Skip-empty-transactions-for-logical-replication.patch
Description: Binary data