On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2...@gmail.com> wrote: > > On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2...@gmail.com> wrote: > > > > Please find attached the latest patch set v73`* > > > > Differences from v72* are: > > > > * Rebased to HEAD @ today (required because v72-0001 no longer applied > > cleanly) > > > > * Minor documentation correction for protocol messages for Commit Prepared > > ('K') > > > > * Non-functional code tidy (mostly proto.c) to reduce overloading > > different meanings to same member names for prepare/commit times. > > > Please find attached a re-posting of patch set v73* > > This is the same as yesterday's v73 but with a contrib module compile > error fixed.
Few comments on v73-0002-Add-prepare-API-support-for-streaming-transactio.patch patch: 1) There are slight differences in error message in case of Alter subscription ... drop publication, we can keep the error message similar: postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH (refresh = false, copy_data=true, two_phase=true); ERROR: unrecognized subscription parameter: "copy_data" postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH (refresh = false, two_phase=true, streaming=true); ERROR: cannot alter two_phase option 2) We are sending txn->xid twice, I felt we should send only once in logicalrep_write_stream_prepare: + /* transaction ID */ + Assert(TransactionIdIsValid(txn->xid)); + pq_sendint32(out, txn->xid); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->u_op_time.prepare_time); + pq_sendint32(out, txn->xid); + 3) We could remove xid and return prepare_data->xid +TransactionId +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); 4) Here comments can be above apply_spooled_messages for better readability + /* + * 1. Replay all the spooled operations - Similar code as for + * apply_handle_stream_commit (i.e. non two-phase stream commit) + */ + + ensure_transaction(); + + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); + 5) Similarly this below comment can be above PrepareTransactionBlock + /* + * 2. Mark the transaction as prepared. - Similar code as for + * apply_handle_prepare (i.e. two-phase non-streamed prepare) + */ + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; + + PrepareTransactionBlock(gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); 6) There is a lot of common code between apply_handle_stream_prepare and apply_handle_prepare, if possible try to have a common function to avoid fixing at both places. + /* + * 2. Mark the transaction as prepared. - Similar code as for + * apply_handle_prepare (i.e. two-phase non-streamed prepare) + */ + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; + + PrepareTransactionBlock(gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); 7) two-phase commit is slightly misleading, we can just mention streaming prepare. + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) 8) should we include Assert of in_streaming similar to other pgoutput_stream*** functions. +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} 9) Here also, we can verify that the transaction is streamed by checking the pg_stat_replication_slots. +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); Regards, Vignesh