On Wed, Jun 9, 2021 at 8:59 PM Alvaro Herrera <alvhe...@alvh.no-ip.org> wrote: > > May I suggest to use a different name in the blurt_and_lock_123() > function, so that it doesn't conflict with the one in > insert-conflict-specconflict? Thanks
Renamed to blurt_and_lock(), is that fine? I haved fixed other comments and also prepared patches for the back branches. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From dcea4c36267ad2dc58dd0a57733a6f6276e2d754 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Tue, 8 Jun 2021 17:06:39 +0530 Subject: [PATCH v5 1/2] Bug fix for speculative abort If speculative insert has a toast table insert then if that tuple is not confirmed then the toast hash is not cleaned and that is creating various problem like a) memory leak b) next insert is using these uncleaned toast data for its insertion and other error and assersion failure. So this patch handle that by queuing the spec abort changes and cleaning up the toast hash on spec abort. Currently, in this patch we are queuing up all the spec abort changes, but as an optimization we can avoid queuing the spec abort for toast tables but for that we need to log that as a flag in WAL. --- src/backend/replication/logical/decode.c | 14 ++++---- src/backend/replication/logical/reorderbuffer.c | 43 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 1 + 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7067016..453efc5 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -1040,19 +1040,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d9e127..dd95785 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -443,6 +443,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -520,6 +523,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -2211,8 +2215,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -2254,6 +2258,38 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. If spec insert change + * is NULL then do nothing, this is possible because we + * have spec abort for each toast entry. So we just have + * to clean the specinsert and toast hash for the first + * spec abort for the main table and remaining changes for + * the tables can be ignored. + */ + if (specinsert != NULL) + { + /* + * Clear the toast hash, we must clean the toast hash + * before we start with a completely new tuple, + * otherwise, while processing the new tuple it would + * create a confusion that whether we need to process + * these toast chunks or not. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* + * If the speculative insertion was aborted, the record + * isn't needed anymore. + */ + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -3754,6 +3790,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -4017,6 +4054,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -4315,6 +4353,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0c6e9d1..9ff0986 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -63,6 +63,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE }; -- 1.8.3.1
From 4440e5dca68da59d9d397efb890893470dd92aaf Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:59:58 +0530 Subject: [PATCH v5 1/2] Bug fix for speculative abort v10 If speculative insert has a toast table insert then if that tuple is not confirmed then the toast hash is not cleaned and that is creating various problem like a) memory leak b) next insert is using these uncleaned toast data for its insertion and other error and assersion failure. So this patch handle that by queuing the spec abort changes and cleaning up the toast hash on spec abort. Currently, in this patch we are queuing up all the spec abort changes, but as an optimization we can avoid queuing the spec abort for toast tables but for that we need to log that as a flag in WAL. --- src/backend/replication/logical/decode.c | 14 ++++----- src/backend/replication/logical/reorderbuffer.c | 42 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 3 +- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d7a6f5d..3778ea9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -778,19 +778,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 165ba8f..2f80c73 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -353,6 +353,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -413,6 +416,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) break; /* no data in addition to the struct itself */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1621,8 +1625,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1664,6 +1668,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. If spec insert change + * is NULL then do nothing, this is possible because we + * have spec abort for each toast entry. So we just have + * to clean the specinsert and toast hash for the first + * spec abort for the main table and remaining changes for + * the tables can be ignored. + */ + if (specinsert != NULL) + { + /* + * Clear the toast hash, we must clean the toast hash + * before we start with a completely new tuple, + * otherwise, while processing the new tuple it would + * create a confusion that whether we need to process + * these toast chunks or not. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* + * If the speculative insertion was aborted, the record + * isn't needed anymore. + */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -2423,6 +2459,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2716,6 +2753,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* the base struct contains all the data, easy peasy */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5530c4f..6c2889a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -59,7 +59,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT }; /* -- 1.8.3.1
From e739d144dd9e2ed1e4903d3992b56b0ac76c1526 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:09:47 +0530 Subject: [PATCH v5 1/2] Bug fix for speculative abort v13 If speculative insert has a toast table insert then if that tuple is not confirmed then the toast hash is not cleaned and that is creating various problem like a) memory leak b) next insert is using these uncleaned toast data for its insertion and other error and assersion failure. So this patch handle that by queuing the spec abort changes and cleaning up the toast hash on spec abort. Currently, in this patch we are queuing up all the spec abort changes, but as an optimization we can avoid queuing the spec abort for toast tables but for that we need to log that as a flag in WAL. --- src/backend/replication/logical/decode.c | 14 ++++----- src/backend/replication/logical/reorderbuffer.c | 42 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 1 + 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3a..4985c2a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -800,19 +800,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5251932..5e571ff 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -397,6 +397,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -467,6 +470,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1677,8 +1681,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1720,6 +1724,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. If spec insert change + * is NULL then do nothing, this is possible because we + * have spec abort for each toast entry. So we just have + * to clean the specinsert and toast hash for the first + * spec abort for the main table and remaining changes for + * the tables can be ignored. + */ + if (specinsert != NULL) + { + /* + * Clear the toast hash, we must clean the toast hash + * before we start with a completely new tuple, + * otherwise, while processing the new tuple it would + * create a confusion that whether we need to process + * these toast chunks or not. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* + * If the speculative insertion was aborted, the record + * isn't needed anymore. + */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -2640,6 +2676,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -3032,6 +3069,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 019bd38..dbd2e84 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -62,6 +62,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE }; -- 1.8.3.1
From ef1253e047556459cdcd415e4e2558353fa41e76 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 13:53:00 +0530 Subject: [PATCH v5 1/2] Bug fix for speculative abort v96 If speculative insert has a toast table insert then if that tuple is not confirmed then the toast hash is not cleaned and that is creating various problem like a) memory leak b) next insert is using these uncleaned toast data for its insertion and other error and assersion failure. So this patch handle that by queuing the spec abort changes and cleaning up the toast hash on spec abort. Currently, in this patch we are queuing up all the spec abort changes, but as an optimization we can avoid queuing the spec abort for toast tables but for that we need to log that as a flag in WAL. --- src/backend/replication/logical/decode.c | 14 ++++----- src/backend/replication/logical/reorderbuffer.c | 42 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 3 +- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1300902..571a901 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -778,19 +778,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f0de337..54c8901 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -364,6 +364,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + /* check whether to put into the slab cache */ if (rb->nr_cached_transactions < max_cached_transactions) { @@ -449,6 +452,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) break; /* no data in addition to the struct itself */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1674,8 +1678,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1717,6 +1721,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. If spec insert change + * is NULL then do nothing, this is possible because we + * have spec abort for each toast entry. So we just have + * to clean the specinsert and toast hash for the first + * spec abort for the main table and remaining changes for + * the tables can be ignored. + */ + if (specinsert != NULL) + { + /* + * Clear the toast hash, we must clean the toast hash + * before we start with a completely new tuple, + * otherwise, while processing the new tuple it would + * create a confusion that whether we need to process + * these toast chunks or not. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* + * If the speculative insertion was aborted, the record + * isn't needed anymore. + */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -2476,6 +2512,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2764,6 +2801,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* the base struct contains all the data, easy peasy */ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e085088..67f2981 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -59,7 +59,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT }; /* -- 1.8.3.1
From 1c08381fd1a32406a80f6f9a0bf6ce5916022b7c Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:09:47 +0530 Subject: [PATCH v5 1/2] Bug fix for speculative abort v11 If speculative insert has a toast table insert then if that tuple is not confirmed then the toast hash is not cleaned and that is creating various problem like a) memory leak b) next insert is using these uncleaned toast data for its insertion and other error and assersion failure. So this patch handle that by queuing the spec abort changes and cleaning up the toast hash on spec abort. Currently, in this patch we are queuing up all the spec abort changes, but as an optimization we can avoid queuing the spec abort for toast tables but for that we need to log that as a flag in WAL. --- src/backend/replication/logical/decode.c | 14 ++++----- src/backend/replication/logical/reorderbuffer.c | 42 +++++++++++++++++++++++-- src/include/replication/reorderbuffer.h | 1 + 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d7da3d6..676f921 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -804,19 +804,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; - /* - * Super deletions are irrelevant for logical decoding, it's driven by the - * confirmation records. - */ - if (xlrec->flags & XLH_DELETE_IS_SUPER) - return; - /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_DELETE; + + if (xlrec->flags & XLH_DELETE_IS_SUPER) + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; + else + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1a4b87c..b9c6016 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -351,6 +351,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + pfree(txn); } @@ -418,6 +421,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) } break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; @@ -1620,8 +1624,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, change_done: /* - * Either speculative insertion was confirmed, or it was - * unsuccessful and the record isn't needed anymore. + * If speculative insertion was confirmed, the record isn't + * needed anymore. */ if (specinsert != NULL) { @@ -1663,6 +1667,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. If spec insert change + * is NULL then do nothing, this is possible because we + * have spec abort for each toast entry. So we just have + * to clean the specinsert and toast hash for the first + * spec abort for the main table and remaining changes for + * the tables can be ignored. + */ + if (specinsert != NULL) + { + /* + * Clear the toast hash, we must clean the toast hash + * before we start with a completely new tuple, + * otherwise, while processing the new tuple it would + * create a confusion that whether we need to process + * these toast chunks or not. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* + * If the speculative insertion was aborted, the record + * isn't needed anymore. + */ + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + break; + case REORDER_BUFFER_CHANGE_TRUNCATE: { int i; @@ -2475,6 +2511,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ @@ -2778,6 +2815,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index c41f362..f51336f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -60,6 +60,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE }; -- 1.8.3.1
From cb8f217a3c9764ff60294298965809a3b55dbcae Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Tue, 8 Jun 2021 17:11:24 +0530 Subject: [PATCH v5 2/2] Test logical decoding of speculative aborts Test logical decoding of speculative aborts for toast insertion followed by insertion into a different table which doesn't have a toast --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a31e0b..1cab935 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot + twophase_snapshot speculative_abort REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..7e80a25 --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertion are handled during logical decoding +# +# Does this by using advisory locks controlling progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" -- 1.8.3.1
From a6bc6c989b3a0fcab69bb1e63af239b30dfd4f9d Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:11:44 +0530 Subject: [PATCH v5 2/2] Test logical decoding of speculative aborts v10 Test logical decoding of speculative aborts for toast insertion followed by insertion into a different table which doesn't have a toast --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 2db2b27..c7def09 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -51,7 +51,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(REGRESSCHECKS) ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top speculative_abort isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..7e80a25 --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertion are handled during logical decoding +# +# Does this by using advisory locks controlling progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" -- 1.8.3.1
From d5f3cb36e68fde7dcd2a207a8e5443ebf7d23c83 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:11:44 +0530 Subject: [PATCH v5 2/2] Test logical decoding of speculative aborts v11 Test logical decoding of speculative aborts for toast insertion followed by insertion into a different table which doesn't have a toast --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 65a91a8..29e5a3e 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -51,7 +51,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(REGRESSCHECKS) ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top speculative_abort isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..7e80a25 --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertion are handled during logical decoding +# +# Does this by using advisory locks controlling progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" -- 1.8.3.1
From 34f99cd579b7994707b207ef8fd748a40c970e4a Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 11:31:31 +0530 Subject: [PATCH v5 2/2] Test logical decoding of speculative aborts v13 Test logical decoding of speculative aborts for toast insertion followed by insertion into a different table which doesn't have a toast --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c58..14e60bd 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ spill slot truncate ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top speculative_abort REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..7e80a25 --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertion are handled during logical decoding +# +# Does this by using advisory locks controlling progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" -- 1.8.3.1
From 4d8e5a68b63ca14abfef755d78f383746b41000f Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Thu, 10 Jun 2021 13:57:17 +0530 Subject: [PATCH v5 2/2] Test logical decoding of speculative aborts v96 Test logical decoding of speculative aborts for toast insertion followed by insertion into a different table which doesn't have a toast --- contrib/test_decoding/Makefile | 2 +- .../test_decoding/expected/speculative_abort.out | 85 ++++++++++++++++ contrib/test_decoding/specs/speculative_abort.spec | 111 +++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/speculative_abort.out create mode 100644 contrib/test_decoding/specs/speculative_abort.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b6fc8da..18dcd2d 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -54,7 +54,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(REGRESSCHECKS) ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top + oldest_xmin snapshot_transfer subxact_without_top speculative_abort isolationcheck: | submake-isolation submake-test_decoding temp-install $(MKDIR_P) isolation_output diff --git a/contrib/test_decoding/expected/speculative_abort.out b/contrib/test_decoding/expected/speculative_abort.out new file mode 100644 index 0000000..7492506 --- /dev/null +++ b/contrib/test_decoding/expected/speculative_abort.out @@ -0,0 +1,85 @@ +Parsed test spec with 3 sessions + +starting permutation: controller_locks controller_show_count s1_begin s1_insert_toast s2_insert_toast controller_show_count controller_unlock_1_1 controller_unlock_2_1 controller_unlock_1_3 controller_unlock_2_3 controller_show_count controller_unlock_2_2 controller_show_count controller_unlock_1_2 s1_insert_other s1_commit controller_get_changes +data + +step controller_locks: SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock); +pg_advisory_locksess lock + + 1 1 + 1 2 + 1 3 + 2 1 + 2 2 + 2 3 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step s1_begin: BEGIN; +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 3 +step s1_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 3 +step s2_insert_toast: INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; <waiting ...> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_1_1: SELECT pg_advisory_unlock(1, 1); +pg_advisory_unlock + +t +step controller_unlock_2_1: SELECT pg_advisory_unlock(2, 1); +pg_advisory_unlock + +t +step controller_unlock_1_3: SELECT pg_advisory_unlock(1, 3); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step controller_unlock_2_3: SELECT pg_advisory_unlock(2, 3); +pg_advisory_unlock + +t +s2: NOTICE: blurt_and_lock() called for 1 in session 2 +s2: NOTICE: acquiring advisory lock on 2 +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +0 +step controller_unlock_2_2: SELECT pg_advisory_unlock(2, 2); +pg_advisory_unlock + +t +step s2_insert_toast: <... completed> +step controller_show_count: SELECT COUNT(*) FROM tbl1; +count + +1 +step controller_unlock_1_2: SELECT pg_advisory_unlock(1, 2); +pg_advisory_unlock + +t +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +s1: NOTICE: blurt_and_lock() called for 1 in session 1 +s1: NOTICE: acquiring advisory lock on 2 +step s1_insert_toast: <... completed> +step s1_insert_other: INSERT INTO tbl2 VALUES(1); +step s1_commit: COMMIT; +step controller_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.tbl1: INSERT: a[integer]:1 b[text]:'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +COMMIT +BEGIN +table public.tbl2: INSERT: a[integer]:1 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/speculative_abort.spec b/contrib/test_decoding/specs/speculative_abort.spec new file mode 100644 index 0000000..7e80a25 --- /dev/null +++ b/contrib/test_decoding/specs/speculative_abort.spec @@ -0,0 +1,111 @@ +# INSERT ... ON CONFLICT test verifying that speculative abort for toast +# insertion are handled during logical decoding +# +# Does this by using advisory locks controlling progress of +# insertions. By waiting when building the index keys, it's possible +# to schedule concurrent INSERT ON CONFLICTs so that there will always +# be a speculative conflict. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (a INT, b TEXT); + ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL; + CREATE TABLE tbl2 (a INT); + + CREATE OR REPLACE FUNCTION blurt_and_lock(int) RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$ + BEGIN + RAISE NOTICE 'blurt_and_lock() called for % in session %', $1, current_setting('spec.session')::int; + + -- depending on lock state, wait for lock 2 or 3 + IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN + RAISE NOTICE 'acquiring advisory lock on 2'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2); + ELSE + RAISE NOTICE 'acquiring advisory lock on 3'; + PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3); + END IF; + RETURN $1; + END;$$; + + CREATE UNIQUE INDEX idx on tbl1(blurt_and_lock(a)); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "controller" +setup +{ + SET default_transaction_isolation = 'read committed'; + SET application_name = 'isolation/insert-specconflict-controller'; +} +step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);} +step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); } +step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); } +step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); } +step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); } +step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); } +step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); } +step "controller_show_count" { SELECT COUNT(*) FROM tbl1; } +step "controller_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 1; + SET application_name = 'isolation/insert-specconflict-s1'; +} + +step "s1_begin" { BEGIN; } +step "s1_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } +step "s1_insert_other" { INSERT INTO tbl2 VALUES(1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup +{ + SET synchronous_commit=on; + SET default_transaction_isolation = 'read committed'; + SET spec.session = 2; + SET application_name = 'isolation/insert-specconflict-s2'; +} +step "s2_insert_toast" { INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ON CONFLICT DO NOTHING; } + + +# Test logical decoding of speculative aborts for toast insertion followed by +# insertion into a different table which doesn't have a toast +permutation + # acquire a number of locks, to control execution flow - the + # blurt_and_lock function acquires advisory locks that allow us to + # continue after a) the optimistic conflict probe b) after the + # insertion of the speculative tuple. + "controller_locks" + "controller_show_count" + "s1_begin" + "s1_insert_toast" "s2_insert_toast" + "controller_show_count" + # Switch both sessions to wait on the other lock next time (the speculative insertion) + "controller_unlock_1_1" "controller_unlock_2_1" + # Allow both sessions to continue + "controller_unlock_1_3" "controller_unlock_2_3" + "controller_show_count" + # Allow the second session to finish insertion + "controller_unlock_2_2" + # This should now show a successful insertion + "controller_show_count" + # Allow the first session to speculative abort + "controller_unlock_1_2" + # Insert into other table from s1 and commit + "s1_insert_other" "s1_commit" + # Get the changes + "controller_get_changes" -- 1.8.3.1