On Tue, May 19, 2020 at 6:01 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, May 15, 2020 at 2:48 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > > > On Wed, May 13, 2020 at 4:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > > > 3. > > > And, during catalog scan we can check the status of the xid and > > > + * if it is aborted we will report a specific error that we can ignore. > > > We > > > + * might have already streamed some of the changes for the aborted > > > + * (sub)transaction, but that is fine because when we decode the abort > > > we will > > > + * stream abort message to truncate the changes in the subscriber. > > > + */ > > > +static inline void > > > +SetupCheckXidLive(TransactionId xid) > > > > > > In the above comment, I don't think it is right to say that we ignore > > > the error raised due to the aborted transaction. We need to say that > > > we discard the already streamed changes on such an error. > > > > Done. > > > > In the same comment, there is typo (/messageto/message to).
Done > > > 4. > > > +static inline void > > > +SetupCheckXidLive(TransactionId xid) > > > +{ > > > /* > > > - * If this transaction has no snapshot, it didn't make any changes to the > > > - * database, so there's nothing to decode. Note that > > > - * ReorderBufferCommitChild will have transferred any snapshots from > > > - * subtransactions if there were any. > > > + * setup CheckXidAlive if it's not committed yet. We don't check if the > > > xid > > > + * aborted. That will happen during catalog access. Also reset the > > > + * sysbegin_called flag. > > > */ > > > - if (txn->base_snapshot == NULL) > > > + if (!TransactionIdDidCommit(xid)) > > > { > > > - Assert(txn->ninvalidations == 0); > > > - ReorderBufferCleanupTXN(rb, txn); > > > - return; > > > + CheckXidAlive = xid; > > > + bsysscan = false; > > > } > > > > > > I think this function is inline as it needs to be called for each > > > change. If that is the case and otherwise also, isn't it better that > > > we check if passed xid is the same as CheckXidAlive before checking > > > TransactionIdDidCommit as TransactionIdDidCommit can be costly and > > > calling it for each change might not be a good idea? > > > > Done, Also I think it is good the check the TransactionIdIsInProgress > > instead of !TransactionIdDidCommit. I have changed that as well. > > > > What if it is aborted just before this check? I think the decode API > won't be able to detect that and sys* API won't care to check because > CheckXidAlive won't be set for that case. Yeah, that's the problem, I think it should be TransactionIdDidCommit only. > > > 5. > > > setup CheckXidAlive if it's not committed yet. We don't check if the xid > > > + * aborted. That will happen during catalog access. Also reset the > > > + * sysbegin_called flag. > > > > > > /if the xid aborted/if the xid is aborted. missing comma after Also. > > > > Done > > > > You forgot to change as per the second part of the comment (missing > comma after Also). Done > > > 8. > > > @@ -1588,8 +1766,6 @@ ReorderBufferCommit(ReorderBuffer *rb, > > > TransactionId xid, > > > * use as a normal record. It'll be cleaned up at the end > > > * of INSERT processing. > > > */ > > > - if (specinsert == NULL) > > > - elog(ERROR, "invalid ordering of speculative insertion changes"); > > > > > > You have removed this check but all other handling of specinsert is > > > same as far as this patch is concerned. Why so? > > > > Seems like a merge issue, or the leftover from the old design of the > > toast handling where we were streaming with the partial tuple. > > fixed now. > > > > > 9. > > > @@ -1676,8 +1860,6 @@ ReorderBufferCommit(ReorderBuffer *rb, > > > TransactionId xid, > > > * freed/reused while restoring spooled data from > > > * disk. > > > */ > > > - Assert(change->data.tp.newtuple != NULL); > > > - > > > dlist_delete(&change->node); > > > > > > Why is this Assert removed? > > > > Same cause as above so fixed. > > > > > 10. > > > @@ -1753,7 +1935,15 @@ ReorderBufferCommit(ReorderBuffer *rb, > > > TransactionId xid, > > > relations[nrelations++] = relation; > > > } > > > > > > - rb->apply_truncate(rb, txn, nrelations, relations, change); > > > + if (streaming) > > > + { > > > + rb->stream_truncate(rb, txn, nrelations, relations, change); > > > + > > > + /* Remember that we have sent some data. */ > > > + change->txn->any_data_sent = true; > > > + } > > > + else > > > + rb->apply_truncate(rb, txn, nrelations, relations, change); > > > > > > Can we encapsulate this in a separate function like > > > ReorderBufferApplyTruncate or something like that? Basically, rather > > > than having streaming check in this function, lets do it in some other > > > internal function. And we can likewise do it for all the streaming > > > checks in this function or at least whereever it is feasible. That > > > will make this function look clean. > > > > Done for truncate and change. I think we can create a few more such > > functions for > > start/stop and cleanup handling on error. I will work on that. > > > > Yeah, I think that would be better. I have done some refactoring, please look into the latest version. > One minor comment change suggestion: > /* > + * start stream or begin the transaction. If this is the first > + * change in the current stream. > + */ > > We can write the above comment as "Start the stream or begin the > transaction for the first change in the current stream." Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com