On Tue, Sep 15, 2020 at 10:43 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> I don't think it is complete yet. > * > * This error can only occur when we are sending the data in > * streaming mode and the streaming is not finished yet. > */ > - Assert(streaming); > - Assert(stream_started); > + Assert(streaming || rbtxn_prepared(txn)); > + Assert(stream_started || rbtxn_prepared(txn)); > > Here, you have updated the code but comments are still not updated. > Updated the comments. > I don't think we need to perform abort here. Later we will anyway > encounter the WAL for Rollback Prepared for which we will call > abort_prepared_cb. As we have set the 'concurrent_abort' flag, it will > allow us to skip all the intermediate records. Here, we need only > enough state in ReorderBufferTxn that it can be later used for > ReorderBufferFinishPrepared(). Basically, you need functionality > similar to ReorderBufferTruncateTXN where except for invalidations you > can free memory for everything else. You can either write a new > function ReorderBufferTruncatePreparedTxn or pass another bool > parameter in ReorderBufferTruncateTXN to indicate it is prepared_xact > and then clean up additional things that are not required for prepared > xact. Added a new parameter to ReorderBufferTruncatePreparedTxn for prepared transactions and did cleanup of tupulecids as well, I have left snapshots and transactions. As a result of this, I also had to create a new function ReorderBufferCleanupPreparedTXN which will clean up the rest as part of FinishPrepared handling as we can't call ReorderBufferCleanupTXN again after this. > > * > Similarly, I don't understand why we need below code: > ReorderBufferProcessTXN() > { > .. > + if (rbtxn_rollback(txn)) > + rb->abort(rb, txn, commit_lsn); > .. > } > > There is nowhere we are setting the RBTXN_ROLLBACK flag, so how will > this check be true? If we decide to remove this code then don't forget > to update the comments. > Removed. > * > If my previous two comments are correct then I don't think we need the > below interface. > + <sect3 id="logicaldecoding-output-plugin-abort"> > + <title>Transaction Abort Callback</title> > + > + <para> > + The required <function>abort_cb</function> callback is called whenever > + a transaction abort has to be initiated. This can happen if we are > + decoding a transaction that has been prepared for two-phase commit and > + a concurrent rollback happens while we are decoding it. > +<programlisting> > +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr abort_lsn); > > > Removed. > >> > >> > >> I don't know why the patch has used this way to implement an option to > >> enable two-phase. Can't we use how we implement 'stream-changes' > >> option in commit 7259736a6e? Just refer how we set ctx->streaming and > >> you can use a similar way to set this parameter. > > > > > > Done, I've moved the checks for callbacks to inside the corresponding > > wrappers. > > > > This is not what I suggested. Please study the commit 7259736a6e and > see how streaming option is implemented. I want later subscribers can > specify whether they want transactions to be decoded at prepare time > similar to what we have done for streaming. Also, search for > ctx->streaming in the code and see how it is set to get the idea. > Changed it similar to ctx->streaming logic. > Note: Please use version number while sending patches, you can use > something like git format-patch -N -v n to do that. It makes easier > for the reviewer to compare it with the previous version. Done. > Few other comments: > =================== > 1. > ReorderBufferProcessTXN() > { > .. > if (streaming) > { > ReorderBufferTruncateTXN(rb, txn); > > /* Reset the CheckXidAlive */ > CheckXidAlive = InvalidTransactionId; > } > else > ReorderBufferCleanupTXN(rb, txn); > .. > } > > I don't think we can perform ReorderBufferCleanupTXN for the prepared > transactions because if we have removed the ReorderBufferTxn before > commit, the later code might not consider such a transaction in the > system and compute the wrong value of restart_lsn for a slot. > Basically, in SnapBuildProcessRunningXacts() when we call > ReorderBufferGetOldestTXN(), it should show the ReorderBufferTxn of > the prepared transaction which is not yet committed but because we > have removed it after prepare, it won't get that TXN and then that > leads to wrong computation of restart_lsn. Once we start from a wrong > point in WAL, the snapshot built was incorrect which will lead to the > wrong result. This is the same reason why the patch is not doing > ReorderBufferForget in DecodePrepare when we decide to skip the > transaction. Also, here, we need to set CheckXidAlive = > InvalidTransactionId; for prepared xact as well. Updated as suggested above. > 2. Have you thought about the interaction of streaming with prepared > transactions? You can try writing some tests using pg_logical* APIs > and see the behaviour. For ex. there is no handling in > ReorderBufferStreamCommit for the same. I think you need to introduce > stream_prepare API similar to stream_commit and then use the same. This is pending. I will look at it in the next iteration. Also pending is the investigation as to why the pgoutput changes were not added initially. regards, Ajin Cherian Fujitsu Australia
v4-0002-Tap-test-to-test-concurrent-aborts-during-2-phase.patch
Description: Binary data
v4-0001-Support-decoding-of-two-phase-transactions.patch
Description: Binary data
v4-0003-pgoutput-output-plugin-support-for-logical-decodi.patch
Description: Binary data