On Sat, Jul 4, 2020 at 11:35 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Tue, Jun 30, 2020 at 5:20 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > Let me know what you think about the above changes. > > > > I went ahead and made few changes in > 0005-Implement-streaming-mode-in-ReorderBuffer which are explained > below. I have few questions and suggestions for the patch as well > which are also covered in below points. > > 1. > + if (prev_lsn == InvalidXLogRecPtr) > + { > + if (streaming) > + rb->stream_start(rb, txn, change->lsn); > + else > + rb->begin(rb, txn); > + stream_started = true; > + } > > I don't think we want to move begin callback here that will change the > existing semantics, so it is better to move begin at its original > position. I have made the required changes in the attached patch.
Looks good to me. > 2. > ReorderBufferTruncateTXN() > { > .. > + dlist_foreach_modify(iter, &txn->changes) > + { > + ReorderBufferChange *change; > + > + change = dlist_container(ReorderBufferChange, node, iter.cur); > + > + /* remove the change from it's containing list */ > + dlist_delete(&change->node); > + > + ReorderBufferReturnChange(rb, change); > + } > .. > } > > I think here we can add an Assert that we're not mixing changes from > different transactions. See the changes in the patch. Looks fine. > 3. > SetupCheckXidLive() > { > .. > + /* > + * setup CheckXidAlive if it's not committed yet. We don't check if the xid > + * aborted. That will happen during catalog access. Also, reset the > + * bsysscan flag. > + */ > + if (!TransactionIdDidCommit(xid)) > + { > + CheckXidAlive = xid; > + bsysscan = false; > .. > } > > What is the need to reset bsysscan flag here if we are already > resetting on error (like in the previous patch sent by me)? Yeah, now we don't not need this. > 4. > ReorderBufferProcessTXN() > { > .. > .. > + /* Reset the CheckXidAlive */ > + if (streaming) > + CheckXidAlive = InvalidTransactionId; > .. > } > > Similar to the previous point, we don't need this as well because > AbortCurrentTransaction would have taken care of this. Right > 5. > + * XXX Do we need to check if the transaction has some changes to stream > + * (maybe it got streamed right before the commit, which attempts to > + * stream it again before the commit)? > + */ > +static void > +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) > > The above comment doesn't make much sense to me, so I have removed it. > Basically, if there are no changes before commit, we still need to > send commit and anyway if there are no more changes > ReorderBufferProcessTXN will not do anything. ok > 6. > +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) > { > .. > if (txn->snapshot_now == NULL) > + { > + dlist_iter subxact_i; > + > + /* make sure this transaction is streamed for the first time */ > + Assert(!rbtxn_is_streamed(txn)); > + > + /* at the beginning we should have invalid command ID */ > + Assert(txn->command_id == InvalidCommandId); > + > + dlist_foreach(subxact_i, &txn->subtxns) > + { > + ReorderBufferTXN *subtxn; > + > + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); > + ReorderBufferTransferSnapToParent(txn, subtxn); > + } > .. > } > > Here, it is possible that there is no base_snapshot for txn, so we > need a check for that similar to ReorderBufferCommit. > > 7. Apart from the above, I made few changes in comments and ran pgindent. Ok > 8. We can't stream the transaction before we reach the > SNAPBUILD_CONSISTENT state because some other output plugin can apply > those changes unlike what we do with pgoutput plugin (which writes to > file). And, I think applying the transactions without reaching a > consistent state would be anyway wrong. So, we should avoid that and > if do that then we should have an Assert for streamed txns rather than > sending abort for them in ReorderBufferForget. I will work on this point. > 9. > +ReorderBufferHandleConcurrentAbort(ReorderBuffer *rb, ReorderBufferTXN *txn, > { > .. > + ReorderBufferToastReset(rb, txn); > + if (specinsert != NULL) > + ReorderBufferReturnChange(rb, specinsert); > .. > } > > Why do we need to do these here when we wouldn't have been done for > any exception other than ERRCODE_TRANSACTION_ROLLBACK? Because we are handling this exception "ERRCODE_TRANSACTION_ROLLBACK" gracefully and we are continuing with further decoding so we need to return this change back. > 10. I have got the below failure once. I have not investigated this > in detail as the patch is still under progress. See, if you have any > idea? > # Failed test 'check extra columns contain local defaults' > # at t/013_stream_subxact_ddl_abort.pl line 81. > # got: '2|0' > # expected: '1000|500' > # Looks like you failed 1 test of 2. > make[2]: *** [check] Error 1 > make[1]: *** [check-subscription-recurse] Error 2 > make[1]: *** Waiting for unfinished jobs.... > make: *** [check-world-src/test-recurse] Error 2 Even I got the failure once and after that, it did not reproduce. I have executed it multiple time but it did not reproduce again. Are you able to reproduce it consistently? > 11. Can we test by introducing a new GUC such that all the > transactions (at least in existing tests) start to stream? Basically, > it will allow us to disregard logical_decoding_work_mem and ensure > that all regression tests pass through new-code. Note, I am > suggesting this just for testing purposes, not for actual integration > in the code. Yeah, that's a good suggestion. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com