Regarding the patchsets, I think we will need to rearrange the commits. Right now 0004 has some parts that should have been in 0001. Also the logic to assign XID to a subtrasaction be better a separate commit. That piece is independent of logical decoding of sequences.
On Fri, Jun 23, 2023 at 6:48 PM Ashutosh Bapat <ashutosh.bapat....@gmail.com> wrote: > > On Tue, Jun 13, 2023 at 11:01 PM Tomas Vondra > <tomas.von...@enterprisedb.com> wrote: > > > > On 5/18/23 16:23, Ashutosh Bapat wrote: > > > Hi, > > > Sorry for jumping late in this thread. > > > > > > I started experimenting with the functionality. Maybe something that > > > was already discussed earlier. Given that the thread is being > > > discussed for so long and has gone several changes, revalidating the > > > functionality is useful. > > > > > > I considered following aspects: > > > Changes to the sequence on subscriber > > > ----------------------------------------------------- > > > 1. Since this is logical decoding, logical replica is writable. So the > > > logically replicated sequence can be manipulated on the subscriber as > > > well. This implementation consolidates the changes on subscriber and > > > publisher rather than replicating the publisher state as is. That's > > > good. See example command sequence below > > > a. publisher calls nextval() - this sets the sequence state on > > > publisher as (1, 32, t) which is replicated to the subscriber. > > > b. subscriber calls nextval() once - this sets the sequence state on > > > subscriber as (34, 32, t) > > > c. subscriber calls nextval() 32 times - on-disk state of sequence > > > doesn't change on subscriber > > > d. subscriber calls nextval() 33 times - this sets the sequence state > > > on subscriber as (99, 0, t) > > > e. publisher calls nextval() 32 times - this sets the sequence state > > > on publisher as (33, 0, t) > > > > > > The on-disk state on publisher at the end of e. is replicated to the > > > subscriber but subscriber doesn't apply it. The state there is still > > > (99, 0, t). I think this is closer to how logical replication of > > > sequence should look like. This is aso good enough as long as we > > > expect the replication of sequences to be used for failover and > > > switchover. > > > > > > > I'm really confused - are you describing what the patch is doing, or > > what you think it should be doing? Because right now there's nothing > > that'd "consolidate" the changes (in the sense of reconciling write > > conflicts), and there's absolutely no way to do that. > > > > So if the subscriber advances the sequence (which it technically can), > > the subscriber state will be eventually be discarded and overwritten > > when the next increment gets decoded from WAL on the publisher. > > I described what I observed in my experiments. My observation doesn't > agree with your description. I will revisit this when I review the > output plugin changes and the WAL receiver changes. > > > > > Yes, I agree with this. It's probably better to replicate just the next > > value, without the log_cnt / is_called fields (which are implementation > > specific). > > Ok. I will review the logic once you revise the patches. > > > > > > > > > 3. Primary key sequences > > > ----------------------------------- > > > I am not experimented with this. But I think we will need to add the > > > sequences associated with the primary keys to the publications > > > publishing the owner tables. Otherwise, we will have problems with the > > > failover. And it needs to be done automatically since a. the names of > > > these sequences are generated automatically b. publications with FOR > > > ALL TABLES will add tables automatically and start replicating the > > > changes. Users may not be able to intercept the replication activity > > > to add the associated sequences are also addedto the publication. > > > > > > > Right, this idea was mentioned before, and I agree maybe we should > > consider adding some of those "automatic" sequences automatically. > > > > Are you planning to add this in the same patch set or separately? > > I reviewed 0001 and related parts of 0004 and 0008 in detail. > > I have only one major change request, about > typedef struct xl_seq_rec > { > RelFileLocator locator; > + bool created; /* creates a new relfilenode (CREATE/ALTER) */ > > I am not sure what are the repercussions of adding a member to an existing WAL > record. I didn't see any code which handles the old WAL format which doesn't > contain the "created" flag. IIUC, the logical decoding may come across > a WAL record written in the old format after upgrade and restart. Is > that not possible? > > But I don't think it's necessary. We can add a > decoding routine for RM_SMGR_ID. The decoding routine will add relfilelocator > in XLOG_SMGR_CREATE record to txn->sequences hash. Rest of the logic will work > as is. Of course we will add non-sequence relfilelocators as well but that > should be fine. Creating a new relfilelocator shouldn't be a frequent > operation. If at all we are worried about that, we can add only the > relfilenodes associated with sequences to the hash table. > > If this idea has been discussed earlier, please point me to the relevant > discussion. > > Some other minor comments and nitpicks. > > <function>stream_stop_cb</function>, <function>stream_abort_cb</function>, > <function>stream_commit_cb</function>, and > <function>stream_change_cb</function> > - are required, while <function>stream_message_cb</function> and > + are required, while <function>stream_message_cb</function>, > + <function>stream_sequence_cb</function> and > > Like the non-streaming counterpart, should we also mention what happens if > those > callbacks are not defined? That applies to stream_message_cb and > stream_truncate_cb too. > + /* > + * Make sure the subtransaction has a XID assigned, so that the sequence > + * increment WAL record is properly associated with it. This matters for > + * increments of sequences created/altered in the transaction, which are > + * handled as transactional. > + */ > + if (XLogLogicalInfoActive()) > + GetCurrentTransactionId(); > > GetCurrentTransactionId() will also assign xids to all the parents so it > doesn't seem necessary to call both GetTopTransactionId() and > GetCurrentTransactionId(). Calling only the latter should suffice. Applies to > all the calls to GetCurrentTransactionId(). > > + > + memcpy(((char *) tuple->tuple.t_data), > + data + sizeof(xl_seq_rec), > + SizeofHeapTupleHeader); > + > + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, > + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, > + datalen); > > The memory chunks being copied in these memcpy calls are contiguous. Why don't > we use a single memcpy? For readability? > > + * If we don't have snapshot or we are just fast-forwarding, there is no > + * point in decoding messages. > > s/decoding messages/decoding sequence changes/ > > + tupledata = XLogRecGetData(r); > + datalen = XLogRecGetDataLen(r); > + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); > + > + /* extract the WAL record, with "created" flag */ > + xlrec = (xl_seq_rec *) XLogRecGetData(r); > > I think we should set tupledata = xlrec + sizeof(xl_seq_rec) so that it points > to actual tuple data. This will also simplify the calculations in > DecodeSeqTule(). > +/* entry for hash table we use to track sequences created in running xacts */ > > s/running/transaction being decoded/ ? > > + > + /* search the lookup table (we ignore the return value, found is enough) */ > + ent = hash_search(rb->sequences, > + (void *) &rlocator, > + created ? HASH_ENTER : HASH_FIND, > + &found); > > Misleading comment. We seem to be using the return value later. > > + /* > + * When creating the sequence, remember the XID of the transaction > + * that created id. > + */ > + if (created) > + ent->xid = xid; > > Should we set ent->locator as well? The sequence won't get cleaned otherwise. > > + > + TeardownHistoricSnapshot(false); > + > + AbortCurrentTransaction(); > > This call to AbortCurrentTransaction() in PG_TRY should be called if only this > block started the transaction? > > + PG_CATCH(); > + { > + TeardownHistoricSnapshot(true); > + > + AbortCurrentTransaction(); > > Shouldn't we do this only if this block started the transaction? And in that > case, wouldn't PG_RE_THROW take care of it? > > +/* > + * Helper function for ReorderBufferProcessTXN for applying sequences. > + */ > +static inline void > +ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, > + Relation relation, ReorderBufferChange *change, > + bool streaming) > > Possibly we should find a way to call this function from > ReorderBufferQueueSequence() when processing non-transactional sequence > change. > It should probably absorb logic common to both the cases. > > + > + if (RelationIsLogicallyLogged(relation)) > + ReorderBufferApplySequence(rb, txn, relation, change, streaming); > > This condition is not used in ReorderBufferQueueSequence() when processing > non-transactional change there. Why? > + > + if (len) > + { > + memcpy(data, &tup->tuple, sizeof(HeapTupleData)); > + data += sizeof(HeapTupleData); > + > + memcpy(data, tup->tuple.t_data, len); > + data += len; > + } > + > > We are just copying the sequence data. Shouldn't we copy the file locator as > well or that's not needed once the change has been queued? Similarly for > ReorderBufferChangeSize() and ReorderBufferChangeSize() > > + /* > + * relfilenode => XID lookup table for sequences created in a transaction > + * (also includes altered sequences, which assigns new relfilenode) > + */ > + HTAB *sequences; > + > > Better renamed as seq_rel_locator or some such. Shouldn't this be part of > ReorderBufferTxn which has similar transaction specific hashes. > > I will continue reviewing the remaining patches. > > -- > Best Wishes, > Ashutosh Bapat -- Best Wishes, Ashutosh Bapat