On Mon, Jan 6, 2020 at 9:21 AM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Sat, Jan 4, 2020 at 4:07 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > It is better to merge it with the main patch for > > "Implement-streaming-mode-in-ReorderBuffer", otherwise, it is a bit > > difficult to review. > Actually, we can merge 0008, 0009, 0012, 0018 to the main patch > (0007). Basically, if we merge all of them then we don't need to deal > with the conflict. I think Tomas has kept them separate so that we > can review the solution for the schema sent. And, I kept 0018 as a > separate patch to avoid conflict and rebasing in 0008, 0009 and 0012. > In the next patch set, I will merge all of them to 0007. >
Okay, I think we can merge those patches. > > > > + /* > > + * We don't expect direct calls to heap_getnext with valid > > + * CheckXidAlive for regular tables. Track that below. > > + */ > > + if (unlikely(TransactionIdIsValid(CheckXidAlive) && > > + !(IsCatalogRelation(scan->rs_base.rs_rd) || > > + RelationIsUsedAsCatalogTable(scan->rs_base.rs_rd)))) > > + elog(ERROR, "improper heap_getnext call"); > > > > Earlier, I thought we don't need to check if it is a regular table in > > this check, but it is required because output plugins can try to do > > that > I did not understand that, can you give some example? > I think it can lead to the same problem of concurrent aborts as for catalog scans. > > > > > > > 2. The commit message of this patch refers to Prepared transactions. > > > > > I think that needs to be changed. > > > > > > > > > > 0006-Implement-streaming-mode-in-ReorderBuffer > > > > > ------------------------------------------------------------------------- > > > > Few comments on v4-0018-Review-comment-fix-and-refactoring: > > 1. > > + if (streaming) > > + { > > + /* > > + * Set the last last of the stream as the final lsn before calling > > + * stream stop. > > + */ > > + txn->final_lsn = prev_lsn; > > + rb->stream_stop(rb, txn); > > + } > > > > Shouldn't we try to final_lsn as is done by Vignesh's patch [2]? > Isn't it the same, there we are doing while serializing and here we > are doing while streaming? Basically, the last LSN we streamed. Am I > missing something? > No, I think you are right. Few more comments: -------------------------------- v4-0007-Implement-streaming-mode-in-ReorderBuffer 1. +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { .. + /* + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all + * information about subtransactions, which could arrive after streaming start. + */ + if (!txn->is_schema_sent) + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); .. } Why are we using base snapshot here instead of the snapshot we saved the first time streaming has happened? And as mentioned in comments, won't we need to consider the snapshots for subtransactions that arrived after the last time we have streamed the changes? 2. + /* remember the command ID and snapshot for the streaming run */ + txn->command_id = command_id; + txn- >snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); I don't see where the txn->snapshot_now is getting freed. The base_snapshot is freed in ReorderBufferCleanupTXN, but I don't see this getting freed. 3. +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { .. + /* + * If this is a subxact, we need to stream the top-level transaction + * instead. + */ + if (txn->toptxn) + { + ReorderBufferStreamTXN(rb, txn->toptxn); + return; + } Is it ever possible that we reach here for subtransaction, if not, then it should be Assert rather than if condition? 4. In ReorderBufferStreamTXN(), don't we need to set some of the txn fields like origin_id, origin_lsn as we do in ReorderBufferCommit() especially to cover the case when it gets called due to memory overflow (aka via ReorderBufferCheckMemoryLimit). v4-0017-Extend-handling-of-concurrent-aborts-for-streamin 1. @@ -3712,7 +3727,22 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); - PG_RE_THROW(); + /* re-throw only if it's not an abort */ + if (errdata- >sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK) + { + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + else + { + /* remember the command ID and snapshot for the streaming run */ + txn- >command_id = command_id; + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + rb->stream_stop(rb, txn); + + FlushErrorState(); + } Can you update comments either in the above code block or some other place to explain what is the concurrent abort problem and how we dealt with it? Also, please explain how the above error handling is sufficient to address all the various scenarios (sub-transaction got aborted when we have already sent some changes, or when we have not sent any changes yet). v4-0006-Gracefully-handle-concurrent-aborts-of-uncommitte 1. + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid(CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction aborted during system catalog scan"))); Why here we can't use TransactionIdDidAbort? If we can't use it, then can you add comments stating the reason of the same. 2. /* + * An xid value pointing to a possibly ongoing or a prepared transaction. + * Currently used in logical decoding. It's possible that such transactions + * can get aborted while the decoding is ongoing. + */ +TransactionId CheckXidAlive = InvalidTransactionId; In comments, there is a mention of a prepared transaction. Do we allow prepared transactions to be decoded as part of this patch? 3. + /* + * If CheckXidAlive is valid, then we check if it aborted. If it did, we + * error out + */ + if (TransactionIdIsValid (CheckXidAlive) && + !TransactionIdIsInProgress(CheckXidAlive) && + !TransactionIdDidCommit(CheckXidAlive)) This comment just says what code below is doing, can you explain the rationale behind this check. It would be better if it is clear by reading comments, why we are doing this check after fetching the tuple. I think this can refer to the comment I suggested to add for changes in patch v4-0017-Extend-handling-of-concurrent-aborts-for-streamin. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com