Hi, here are some review comments for your patch v4-0001. ====== contrib/test_decoding/sql/stats.sql
1. Huh? The test fails because the "expected results" file for these new tests is missing from the patch. ====== .../replication/logical/reorderbuffer.c 2. static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, - bool txn_prepared); + bool txn_prepared, bool mark_streamed); IIUC this new 'mark_streamed' parameter is more like a prerequisite for the other conditions to decide to mark the tx as streamed -- i.e. it is more like 'can_mark_streamed', so I felt the name should be changed to be like that (everywhere it is used). ~~~ 3. ReorderBufferTruncateTXN - * 'txn_prepared' indicates that we have decoded the transaction at prepare - * time. + * If mark_streamed is true, we could mark the transaction as streamed. + * + * 'streaming_txn' indicates that the given transaction is a streaming transaction. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared, + bool mark_streamed) ~ What's that new comment about 'streaming_txn' for? It seemed unrelated to the patch code. ~~~ 4. /* * Mark the transaction as streamed. * * The top-level transaction, is marked as streamed always, even if it * does not contain any changes (that is, when all the changes are in * subtransactions). * * For subtransactions, we only mark them as streamed when there are * changes in them. * * We do it this way because of aborts - we don't want to send aborts for * XIDs the downstream is not aware of. And of course, it always knows * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ if (mark_streamed && (!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; ~~ With the patch introduction of the new parameter, I felt this code might be better if it was refactored as follows: /* Mark the transaction as streamed, if appropriate. */ if (can_mark_streamed) { /* ... large comment */ if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; } ~~~ 5. ReorderBufferPrepare - if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + if (!txn_aborted && rbtxn_did_abort(txn) && !rbtxn_is_streamed(txn)) rb->prepare(rb, txn, txn->final_lsn); ~ Maybe I misunderstood this logic, but won't a "concurrent abort" cause your new Assert added in ReorderBufferProcessTXN to fail? + /* Update transaction status */ + Assert((curtxn->txn_flags & (RBTXN_COMMITTED | RBTXN_ABORTED)) == 0); ~~~ 6. ReorderBufferCheckTXNAbort + /* Check the transaction status using CLOG lookup */ + if (TransactionIdIsInProgress(txn->xid)) + return false; + + if (TransactionIdDidCommit(txn->xid)) + { + /* + * Remember the transaction is committed so that we can skip CLOG + * check next time, avoiding the pressure on CLOG lookup. + */ + txn->txn_flags |= RBTXN_COMMITTED; + return false; + } IIUC the purpose of the TransactionIdDidCommit() was to avoid the overhead of calling the TransactionIdIsInProgress(). So, shouldn't the order of these checks be swapped? Otherwise, there might be 1 extra unnecessary call to TransactionIdIsInProgress() next time. ====== src/include/replication/reorderbuffer.h 7. #define RBTXN_PREPARE 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100 +#define RBTXN_COMMITTED 0x0200 +#define RBTXN_ABORTED 0x0400 For consistency with the existing bitmask names, I guess these should be named: - RBTXN_COMMITTED --> RBTXN_IS_COMMITTED - RBTXN_ABORTED --> RBTXN_IS_ABORTED ~~~ 8. Similarly, IMO the macros should have the same names as the bitmasks, like the other nearby ones generally seem to. rbtxn_did_commit --> rbtxn_is_committed rbtxn_did_abort --> rbtxn_is_aborted ====== 9. Also, attached is a top-up patch for other cosmetic nitpicks: - comment wording - typos in comments - excessive or missing blank lines - etc. ====== Kind Regards, Peter Smith. Fujitsu Australia
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 7e05f39..a6a441d 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -54,14 +54,15 @@ COMMIT; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4; -- Execute a transaction that is prepared and aborted. We detect that the --- transaction is aborted before spilling changes, and skip to collect --- further changes. So the transaction should not be spilled at all. +-- transaction is aborted before spilling changes, and then skip collecting +-- further changes. So, the transaction should not be spilled at all. BEGIN; INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i); TRUNCATE table stats_test; PREPARE TRANSACTION 'test1_abort'; ROLLBACK PREPARED 'test1_abort'; --- should show only ROLLBACK PREAPRED. + +-- Should show only ROLLBACK PREPARED. SELECT data FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Check stats. We should not spill anything as the transaction is already diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 861ee1c..8b3e1b8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2802,7 +2802,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, txn->gid = pstrdup(gid); /* - * We remember whether the transaction is already aborted before the + * Remember whether the transaction is already aborted before the * replay in order to detect the concurrent abort below. */ txn_aborted = rbtxn_did_abort(txn); @@ -3608,18 +3608,20 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) /* * Check the transaction status of the given transaction. If the transaction - * already aborted, we discards all changes accumulated so far and ignore - * future changes, and return true. Otherwise return false. + * already aborted, we discard all changes accumulated so far, ignore future + * changes, and return true. Otherwise return false. * - * If logical_replication_mode is set to "immediate", we disable this check - * for regression tests. + * If GUC 'debug_logical_replication_streaming' is "immediate", we don't + * check the transaction status, so the caller always processes this + * transaction. This is to disable this check for regression tests. */ static bool ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* - * If logical_replication_mode is "immediate", we don't check the - * transaction status so the caller always processes this transaction. + * If GUC 'debug_logical_replication_streaming' is "immediate", we don't + * check the transaction status, so the caller always processes this + * transaction. */ if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE) return false; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b0d381c..8eb0704 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -227,19 +227,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ ) -/* Did this transaction committed? */ +/* Is this transaction committed? */ #define rbtxn_did_commit(txn) \ ( \ ((txn)->txn_flags & RBTXN_COMMITTED) != 0 \ ) -/* Did this transaction aborted? */ +/* Is this transaction aborted? */ #define rbtxn_did_abort(txn) \ ( \ ((txn)->txn_flags & RBTXN_ABORTED) != 0 \ ) - /* prepare for this transaction skipped? */ #define rbtxn_skip_prepared(txn) \ ( \