On Tue, May 19, 2020 at 5:34 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, May 15, 2020 at 2:47 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > > > On Tue, May 12, 2020 at 4:39 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > > > 4. > > > +static void > > > +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) > > > +{ > > > + LogicalDecodingContext *ctx = cache->private_data; > > > + LogicalErrorCallbackState state; > > > + ErrorContextCallback errcallback; > > > + > > > + Assert(!ctx->fast_forward); > > > + > > > + /* We're only supposed to call this when streaming is supported. */ > > > + Assert(ctx->streaming); > > > + > > > + /* Push callback + info on the error context stack */ > > > + state.ctx = ctx; > > > + state.callback_name = "stream_start"; > > > + /* state.report_location = apply_lsn; */ > > > > > > Why can't we supply the report_location here? I think here we need to > > > report txn->first_lsn if this is the very first stream and > > > txn->final_lsn if it is any consecutive one. > > > > Done > > > > Now after your change in stream_start_cb_wrapper, we assign > report_location as first_lsn passed as input to function but > write_location is still txn->first_lsn. Shouldn't we assing passed in > first_lsn to write_location? It seems assigning txn->first_lsn won't > be correct for streams other than first-one.
Done > > > > 5. > > > +static void > > > +stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) > > > +{ > > > + LogicalDecodingContext *ctx = cache->private_data; > > > + LogicalErrorCallbackState state; > > > + ErrorContextCallback errcallback; > > > + > > > + Assert(!ctx->fast_forward); > > > + > > > + /* We're only supposed to call this when streaming is supported. */ > > > + Assert(ctx->streaming); > > > + > > > + /* Push callback + info on the error context stack */ > > > + state.ctx = ctx; > > > + state.callback_name = "stream_stop"; > > > + /* state.report_location = apply_lsn; */ > > > > > > Can't we report txn->final_lsn here > > > > We are already setting this to the txn->final_ls in 0006 patch, but I > > have moved it into this patch now. > > > > Similar to previous point, here also, I think we need to assign report > and write location as last_lsn passed to this API. Done > > > > > v20-0005-Implement-streaming-mode-in-ReorderBuffer > > > ----------------------------------------------------------------------------- > > > 10. > > > Theoretically, we could get rid of the k-way merge, and append the > > > changes to the toplevel xact directly (and remember the position > > > in the list in case the subxact gets aborted later). > > > > > > I don't think this part of the commit message is correct as we > > > sometimes need to spill even during streaming. Please check the > > > entire commit message and update according to the latest > > > implementation. > > > > Done > > > > You seem to forgot about removing the other part of message ("This > adds a second iterator for the streaming case...." which is not > relavant now. Done > > > 11. > > > - * HeapTupleSatisfiesHistoricMVCC. > > > + * tqual.c's HeapTupleSatisfiesHistoricMVCC. > > > + * > > > + * We do build the hash table even if there are no CIDs. That's > > > + * because when streaming in-progress transactions we may run into > > > + * tuples with the CID before actually decoding them. Think e.g. about > > > + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded > > > + * yet when applying the INSERT. So we build a hash table so that > > > + * ResolveCminCmaxDuringDecoding does not segfault in this case. > > > + * > > > + * XXX We might limit this behavior to streaming mode, and just bail > > > + * out when decoding transaction at commit time (at which point it's > > > + * guaranteed to see all CIDs). > > > */ > > > static void > > > ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) > > > @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer > > > *rb, ReorderBufferTXN *txn) > > > dlist_iter iter; > > > HASHCTL hash_ctl; > > > > > > - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) > > > - return; > > > - > > > > > > I don't understand this change. Why would "INSERT followed by > > > TRUNCATE" could lead to a tuple which can come for decode before its > > > CID? The patch has made changes based on this assumption in > > > HeapTupleSatisfiesHistoricMVCC which appears to be very risky as the > > > behavior could be dependent on whether we are streaming the changes > > > for in-progress xact or at the commit of a transaction. We might want > > > to generate a test to once validate this behavior. > > > > > > Also, the comment refers to tqual.c which is wrong as this API is now > > > in heapam_visibility.c. > > > > Done. > > > > + * INSERT. So in such cases we assume the CIDs is from the future command > + * and return as unresolve. > + */ > + if (tuplecid_data == NULL) > + return false; > + > > Here lets reword the last line of comment as ". So in such cases we > assume the CID is from the future command." Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com