On Tue, May 25, 2021 at 5:46 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Tue, May 25, 2021 at 4:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > Your patch will fix the reported scenario but I don't like the way > > multi_insert flag is used to detect incomplete tuple. One problem > > could be that even when there are no toast inserts, it won't allow to > > stream unless we get the last tuple of multi insert WAL. How about > > changing the code such that when we are clearing the toast flag, we > > additionally check 'clear_toast_afterwards' flag? > > Yes, that can be done, I will fix this in the next version of the patch.
I have fixed as per the suggestion, and as per the offlist discussion, I have merged the TOAST and SPEC insert flag and created a single PARTIAL_CHANGE flag. I have also added a test case for this. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From 37bd2166bc5036613ec2666ac06ae8106170550b Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Tue, 25 May 2021 14:51:45 +0530 Subject: [PATCH v2] Fix bug while streaming the multi-insert toast changes While processing the multi-insert we can not clean the toast untill we get the last insert of the multi-insert. So mark the transaction incomplete for streaming if we get any multi-insert change, and keep it set until we get the last tuple of the multi-insert. --- contrib/test_decoding/expected/stream.out | 41 +++++++++++++++++++++++++ contrib/test_decoding/sql/stream.sql | 5 +++ src/backend/replication/logical/reorderbuffer.c | 36 ++++++++++------------ src/include/replication/reorderbuffer.h | 27 ++++------------ 4 files changed, 68 insertions(+), 41 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index e1c3bc8..1e93955 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -82,6 +82,47 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl committing streamed transaction (13 rows) +-- streaming test for toast with multi-insert +SELECT 'psql -At -c "copy stream_test to stdout" ' || current_database() AS copy_command \gset +COPY stream_test FROM program :'copy_command'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + data +------------------------------------------ + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + committing streamed transaction +(33 rows) + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql index ce86c81..cad028b 100644 --- a/contrib/test_decoding/sql/stream.sql +++ b/contrib/test_decoding/sql/stream.sql @@ -26,5 +26,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +-- streaming test for toast with multi-insert +SELECT 'psql -At -c "copy stream_test to stdout" ' || current_database() AS copy_command \gset +COPY stream_test FROM program :'copy_command'; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); + DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b0ab91c..4401608 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -705,31 +705,27 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, toptxn = txn; /* - * Set the toast insert bit whenever we get toast insert to indicate a - * partial change and clear it when we get the insert or update on main - * table (Both update and insert will do the insert in the toast table). + * If this is a toast change then set a bit to indicate a partial change + * and clear it when we get the insert or update on main table (Both update + * and insert will do the insert in the toast table) and + * clear_toast_afterwards is set. */ if (toast_insert) - toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT; - else if (rbtxn_has_toast_insert(toptxn) && - IsInsertOrUpdate(change->action)) - toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT; + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsInsertOrUpdate(change->action) && + change->data.tp.clear_toast_afterwards) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; /* - * Set the spec insert bit whenever we get the speculative insert to + * Set the partial change bit whenever we get the speculative insert to * indicate the partial change and clear the same on speculative confirm. */ if (IsSpecInsert(change->action)) - toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT; - else if (IsSpecConfirm(change->action)) - { - /* - * Speculative confirm change must be preceded by speculative - * insertion. - */ - Assert(rbtxn_has_spec_insert(toptxn)); - toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT; - } + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsSpecConfirm(change->action)) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; /* * Stream the transaction if it is serialized before and the changes are @@ -741,7 +737,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, * changes. Delaying such transactions would increase apply lag for them. */ if (ReorderBufferCanStartStreaming(rb) && - !(rbtxn_has_incomplete_tuple(toptxn)) && + !(rbtxn_has_partial_change(toptxn)) && rbtxn_is_serialized(txn)) ReorderBufferStreamTXN(rb, toptxn); } @@ -3399,7 +3395,7 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb) Assert(txn->base_snapshot != NULL); if ((largest == NULL || txn->total_size > largest_size) && - (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn))) + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) { largest = txn; largest_size = txn->total_size; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 53cdfa5..ced9f3e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -172,10 +172,9 @@ typedef struct ReorderBufferChange #define RBTXN_IS_SERIALIZED 0x0004 #define RBTXN_IS_SERIALIZED_CLEAR 0x0008 #define RBTXN_IS_STREAMED 0x0010 -#define RBTXN_HAS_TOAST_INSERT 0x0020 -#define RBTXN_HAS_SPEC_INSERT 0x0040 -#define RBTXN_PREPARE 0x0080 -#define RBTXN_SKIPPED_PREPARE 0x0100 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -201,24 +200,10 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \ ) -/* This transaction's changes has toast insert, without main table insert. */ -#define rbtxn_has_toast_insert(txn) \ +/* This transaction's changes has partial changes? */ +#define rbtxn_has_partial_change(txn) \ ( \ - ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ -) -/* - * This transaction's changes has speculative insert, without speculative - * confirm. - */ -#define rbtxn_has_spec_insert(txn) \ -( \ - ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ -) - -/* Check whether this transaction has an incomplete change. */ -#define rbtxn_has_incomplete_tuple(txn) \ -( \ - rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \ + ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) /* -- 1.8.3.1