Hi, here are some review comments for your patch v4-0001.

======
contrib/test_decoding/sql/stats.sql

1.
Huh? The test fails because the "expected results" file for these new
tests is missing from the patch.

======
.../replication/logical/reorderbuffer.c

2.
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
- bool txn_prepared);
+ bool txn_prepared, bool mark_streamed);

IIUC this new 'mark_streamed' parameter is more like a prerequisite
for the other conditions to decide to mark the tx as streamed -- i.e.
it is more like 'can_mark_streamed', so I felt the name should be
changed to be like that (everywhere it is used).

~~~

3. ReorderBufferTruncateTXN

- * 'txn_prepared' indicates that we have decoded the transaction at prepare
- * time.
+ * If mark_streamed is true, we could mark the transaction as streamed.
+ *
+ * 'streaming_txn' indicates that the given transaction is a
streaming transaction.
  */
 static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared,
+ bool mark_streamed)

~

What's that new comment about 'streaming_txn' for? It seemed unrelated
to the patch code.

~~~

4.
/*
* Mark the transaction as streamed.
*
* The top-level transaction, is marked as streamed always, even if it
* does not contain any changes (that is, when all the changes are in
* subtransactions).
*
* For subtransactions, we only mark them as streamed when there are
* changes in them.
*
* We do it this way because of aborts - we don't want to send aborts for
* XIDs the downstream is not aware of. And of course, it always knows
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
if (mark_streamed && (!txn_prepared) &&
(rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;

~~

With the patch introduction of the new parameter, I felt this code
might be better if it was refactored as follows:

/* Mark the transaction as streamed, if appropriate. */
if (can_mark_streamed)
{
  /*
   ... large comment
   */
  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
    txn->txn_flags |= RBTXN_IS_STREAMED;
}

~~~

5. ReorderBufferPrepare

- if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+ if (!txn_aborted && rbtxn_did_abort(txn) && !rbtxn_is_streamed(txn))
  rb->prepare(rb, txn, txn->final_lsn);

~

Maybe I misunderstood this logic, but won't a "concurrent abort" cause
your new Assert added in ReorderBufferProcessTXN to fail?

+ /* Update transaction status */
+ Assert((curtxn->txn_flags & (RBTXN_COMMITTED | RBTXN_ABORTED)) == 0);

~~~

6. ReorderBufferCheckTXNAbort

+ /* Check the transaction status using CLOG lookup */
+ if (TransactionIdIsInProgress(txn->xid))
+ return false;
+
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ /*
+ * Remember the transaction is committed so that we can skip CLOG
+ * check next time, avoiding the pressure on CLOG lookup.
+ */
+ txn->txn_flags |= RBTXN_COMMITTED;
+ return false;
+ }

IIUC the purpose of the TransactionIdDidCommit() was to avoid the
overhead of calling the TransactionIdIsInProgress(). So, shouldn't the
order of these checks be swapped? Otherwise, there might be 1 extra
unnecessary call to TransactionIdIsInProgress() next time.

======
src/include/replication/reorderbuffer.h

7.
 #define RBTXN_PREPARE              0x0040
 #define RBTXN_SKIPPED_PREPARE   0x0080
 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
+#define RBTXN_COMMITTED 0x0200
+#define RBTXN_ABORTED 0x0400

For consistency with the existing bitmask names, I guess these should be named:
- RBTXN_COMMITTED --> RBTXN_IS_COMMITTED
- RBTXN_ABORTED --> RBTXN_IS_ABORTED

~~~

8.
Similarly, IMO the macros should have the same names as the bitmasks,
like the other nearby ones generally seem to.

rbtxn_did_commit --> rbtxn_is_committed
rbtxn_did_abort --> rbtxn_is_aborted

======

9.
Also, attached is a top-up patch for other cosmetic nitpicks:
- comment wording
- typos in comments
- excessive or missing blank lines
- etc.

======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/contrib/test_decoding/sql/stats.sql 
b/contrib/test_decoding/sql/stats.sql
index 7e05f39..a6a441d 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -54,14 +54,15 @@ COMMIT;
 SELECT 'init' FROM 
pg_create_logical_replication_slot('regression_slot_stats4_twophase', 
'test_decoding', false, true) s4;
 
 -- Execute a transaction that is prepared and aborted. We detect that the
--- transaction is aborted before spilling changes, and skip to collect
--- further changes. So the transaction should not be spilled at all.
+-- transaction is aborted before spilling changes, and then skip collecting
+-- further changes. So, the transaction should not be spilled at all.
 BEGIN;
 INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM 
generate_series(1, 5000) g(i);
 TRUNCATE table stats_test;
 PREPARE TRANSACTION 'test1_abort';
 ROLLBACK PREPARED 'test1_abort';
--- should show only ROLLBACK PREAPRED.
+
+-- Should show only ROLLBACK PREPARED.
 SELECT data FROM 
pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Check stats. We should not spill anything as the transaction is already
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 861ee1c..8b3e1b8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2802,7 +2802,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
        txn->gid = pstrdup(gid);
 
        /*
-        * We remember whether the transaction is already aborted before the
+        * Remember whether the transaction is already aborted before the
         * replay in order to detect the concurrent abort below.
         */
        txn_aborted = rbtxn_did_abort(txn);
@@ -3608,18 +3608,20 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 
 /*
  * Check the transaction status of the given transaction. If the transaction
- * already aborted, we discards all changes accumulated so far and ignore
- * future changes, and return true. Otherwise return false.
+ * already aborted, we discard all changes accumulated so far, ignore future
+ * changes, and return true. Otherwise return false.
  *
- * If logical_replication_mode is set to "immediate", we disable this check
- * for regression tests.
+ * If GUC 'debug_logical_replication_streaming' is "immediate", we don't
+ * check the transaction status, so the caller always processes this
+ * transaction. This is to disable this check for regression tests.
  */
 static bool
 ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
        /*
-        * If logical_replication_mode is "immediate", we don't check the
-        * transaction status so the caller always processes this transaction.
+        * If GUC 'debug_logical_replication_streaming' is "immediate", we don't
+        * check the transaction status, so the caller always processes this
+        * transaction.
         */
        if (debug_logical_replication_streaming == 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)
                return false;
diff --git a/src/include/replication/reorderbuffer.h 
b/src/include/replication/reorderbuffer.h
index b0d381c..8eb0704 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -227,19 +227,18 @@ typedef struct ReorderBufferChange
        ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
 )
 
-/* Did this transaction committed? */
+/* Is this transaction committed? */
 #define rbtxn_did_commit(txn) \
 ( \
        ((txn)->txn_flags & RBTXN_COMMITTED) != 0 \
 )
 
-/* Did this transaction aborted? */
+/* Is this transaction aborted? */
 #define rbtxn_did_abort(txn) \
 ( \
        ((txn)->txn_flags & RBTXN_ABORTED) != 0 \
 )
 
-
 /* prepare for this transaction skipped? */
 #define rbtxn_skip_prepared(txn) \
 ( \

Reply via email to