On Wed, Nov 15, 2023 at 5:21 PM shveta malik <shveta.ma...@gmail.com> wrote: > > PFA v34. >
Few comments on v34-0001* ======================= 1. + char buf[100]; + + buf[0] = '\0'; + + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + strcat(buf, "twophase"); + if (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) + { + if (buf[0] != '\0') + strcat(buf, " and "); + strcat(buf, "failover"); + } + ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, buf))); I feel it is better to separate elogs rather than construct the string. It would be easier for the translation. 2. - /* Initialize walsender process before entering the main command loop */ Spurious line removal 3. @@ -440,17 +448,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) if (startlsn < moveto) { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.restart_lsn = moveto; - SpinLockRelease(&MyReplicationSlot->mutex); + PhysicalConfirmReceivedLocation(moveto); retlsn = moveto; - - /* - * Dirty the slot so as it is written out at the next checkpoint. Note - * that the LSN position advanced may still be lost in the event of a - * crash, but this makes the data consistent after a clean shutdown. - */ - ReplicationSlotMarkDirty(); } I think this change has been made so that we can wakeup logical walsenders from a central location. In general, this is a good idea but it seems calling PhysicalConfirmReceivedLocation() would make an additional call to ReplicationSlotsComputeRequiredLSN() which is already called in the caller of pg_physical_replication_slot_advance(), so not sure such unification is a good idea here. 4. + * Here logical walsender associated with failover logical slot waits + * for physical standbys corresponding to physical slots specified in + * standby_slot_names GUC. + */ +void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) In the above comments, we don't seem to follow the 80-col limit. Please check all other comments in the patch for similar problem. 5. +static void +WalSndRereadConfigAndSlots(List **standby_slots) +{ + char *pre_standby_slot_names = pstrdup(standby_slot_names); + + ProcessConfigFile(PGC_SIGHUP); + + if (strcmp(pre_standby_slot_names, standby_slot_names) != 0) + { + list_free(*standby_slots); + *standby_slots = GetStandbySlotList(true); + } + + pfree(pre_standby_slot_names); +} The function name is misleading w.r.t the functionality. Can we name it on the lines of WalSndRereadConfigAndReInitSlotList()? I know it is a bit longer but couldn't come up with anything better. 6. + /* + * Fast path to entering the loop in case we already know we have + * enough WAL available and all the standby servers has confirmed + * receipt of WAL upto RecentFlushPtr. I think this comment is a bit misleading because it is a fast path to avoid entering the loop. I think we can keep the existing comment here: "Fast path to avoid acquiring the spinlock in case we already know ..." 7. @@ -3381,7 +3673,9 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_for_standby) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) The comment above this change needs to be updated for the usage of this new CV. 8. +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for physical standby confirmation in WAL sender process." I feel the above description is not clear. How about being more specific with something along the lines of: "Waiting for the WAL to be received by physical standby in WAL sender process." 9. + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), I think we slightly simplify it by saying: "Lists streaming replication standby server slot names that logical WAL sender processes wait for.". It would be more consistent with a few other similar variables. 10. + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), Instead of walsenders, let's use WAL sender processes. 11. @@ -6622,10 +6623,12 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", suborigin AS \"%s\"\n" ", subpasswordrequired AS \"%s\"\n" - ", subrunasowner AS \"%s\"\n", + ", subrunasowner AS \"%s\"\n" + ", subfailoverstate AS \"%s\"\n", gettext_noop("Origin"), gettext_noop("Password required"), - gettext_noop("Run as owner?")); + gettext_noop("Run as owner?"), + gettext_noop("Enable failover?")); Let's name the new column as "Failover" and also it should be displayed only when pset.sversion is >=17. 12. @@ -93,6 +97,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + char subfailoverstate; /* Enable Failover State */ This should be listed in system_views.sql in the below GRANT statement: GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subslotname, subsynccommit, subpublications, suborigin) 13. + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; It is better to add a comment for this new variable explaining its use. -- With Regards, Amit Kapila.