On Thu, May 7, 2020 at 6:17 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Tue, May 5, 2020 at 7:13 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > I have fixed one more issue in 0010 patch. The issue was that once > the transaction is serialized due to the incomplete toast after > streaming the serialized store was not cleaned up so it was streaming > the same tuple multiple times. >
I have reviewed a few patches (003, 004, and 005) and below are my comments. v20-0003-Extend-the-output-plugin-API-with-stream-methods ---------------------------------------------------------------------------------------- 1. +static void +pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} In the above and similar APIs, there are parameters like relation which are not used. I think you should add some comments atop these APIs to explain why it is so? I guess it is because we want to keep them similar to non-stream version of APIs and we can't display relation or other information as the transaction is still in-progress. 2. + <para> + Similar to spill-to-disk behavior, streaming is triggered when the total + amount of changes decoded from the WAL (for all in-progress transactions) + exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting. + At that point the largest toplevel transaction (measured by amount of memory + currently used for decoded changes) is selected and streamed. + </para> I think we need to explain here the cases/exception where we need to spill even when stream is enabled and check if this is per latest implementation, otherwise, update it. 3. + * To support streaming, we require change/commit/abort callbacks. The + * message callback is optional, similarly to regular output plugins. /similarly/similar 4. +static void +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_start"; + /* state.report_location = apply_lsn; */ Why can't we supply the report_location here? I think here we need to report txn->first_lsn if this is the very first stream and txn->final_lsn if it is any consecutive one. 5. +static void +stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_stop"; + /* state.report_location = apply_lsn; */ Can't we report txn->final_lsn here? 6. I think it will be good if we can provide an example of streaming changes via test_decoding at https://www.postgresql.org/docs/devel/test-decoding.html. I think we can also explain there why the user is not expected to see the actual data in the stream. v20-0004-Gracefully-handle-concurrent-aborts-of-uncommitt ---------------------------------------------------------------------------------------- 7. + /* + * We don't expect direct calls to table_tuple_get_latest_tid with valid + * CheckXidAlive for catalog or regular tables. There is an extra space between 'CheckXidAlive' and 'for'. I can see similar problems in other places as well where this comment is used, fix those as well. 8. +/* + * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) + * transaction. Currently, it is used in logical decoding. It's possible + * that such transactions can get aborted while the decoding is ongoing in + * which case we skip decoding that particular transaction. To ensure that we + * check whether the CheckXidAlive is aborted after fetching the tuple from + * system tables. We also ensure that during logical decoding we never + * directly access the tableam or heap APIs because we are checking for the + * concurrent aborts only in systable_* APIs. + */ In this comment, there is an inconsistency in the space used after completing the sentence. In the part "transaction. To", single space is used whereas at other places two spaces are used after a full stop. v20-0005-Implement-streaming-mode-in-ReorderBuffer ----------------------------------------------------------------------------- 9. Implement streaming mode in ReorderBuffer Instead of serializing the transaction to disk after reaching the maximum number of changes in memory (4096 changes), we consume the changes we have in memory and invoke new stream API methods. This happens in ReorderBufferStreamTXN() using about the same logic as in ReorderBufferCommit() logic. I think the above part of the commit message needs to be updated. 10. Theoretically, we could get rid of the k-way merge, and append the changes to the toplevel xact directly (and remember the position in the list in case the subxact gets aborted later). I don't think this part of the commit message is correct as we sometimes need to spill even during streaming. Please check the entire commit message and update according to the latest implementation. 11. - * HeapTupleSatisfiesHistoricMVCC. + * tqual.c's HeapTupleSatisfiesHistoricMVCC. + * + * We do build the hash table even if there are no CIDs. That's + * because when streaming in-progress transactions we may run into + * tuples with the CID before actually decoding them. Think e.g. about + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded + * yet when applying the INSERT. So we build a hash table so that + * ResolveCminCmaxDuringDecoding does not segfault in this case. + * + * XXX We might limit this behavior to streaming mode, and just bail + * out when decoding transaction at commit time (at which point it's + * guaranteed to see all CIDs). */ static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_iter iter; HASHCTL hash_ctl; - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) - return; - I don't understand this change. Why would "INSERT followed by TRUNCATE" could lead to a tuple which can come for decode before its CID? The patch has made changes based on this assumption in HeapTupleSatisfiesHistoricMVCC which appears to be very risky as the behavior could be dependent on whether we are streaming the changes for in-progress xact or at the commit of a transaction. We might want to generate a test to once validate this behavior. Also, the comment refers to tqual.c which is wrong as this API is now in heapam_visibility.c. 12. + * setup CheckXidAlive if it's not committed yet. We don't check if the xid + * aborted. That will happen during catalog access. Also reset the + * sysbegin_called flag. */ - if (txn->base_snapshot == NULL) + if (!TransactionIdDidCommit(xid)) { - Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); - return; + CheckXidAlive = xid; + bsysscan = false; } In the comment, the flag name 'sysbegin_called' should be bsysscan. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com