Here are some review comments for v20-0002. ====== 1. GENERAL - errmsg/elog messages
There are a a lot of minor problems and/or quirks across all the message texts. Here is a summary of some I found: ERROR errmsg("could not receive list of slots from the primary server: %s", errmsg("invalid response from primary server"), errmsg("invalid connection string syntax: %s", errmsg("replication slot-sync worker slot %d is empty, cannot attach", errmsg("replication slot-sync worker slot %d is already used by another worker, cannot attach", errmsg("replication slot-sync worker slot %d is already used by another worker, cannot attach", errmsg("could not connect to the primary server: %s", errmsg("operation not permitted on replication slots on standby which are synchronized from primary"))); /primary/the primary/ errmsg("could not fetch invalidation cuase for slot \"%s\" from primary: %s", /cuase/cause/ /primary/the primary/ errmsg("slot \"%s\" disapeared from the primary", /disapeared/disappeared/ errmsg("could not fetch slot info from the primary: %s", errmsg("could not connect to the primary server: %s", err))); errmsg("could not map dynamic shared memory segment for slot-sync worker"))); errmsg("physical replication slot %s found in synchronize_slot_names", slot name not quoted? --- WARNING errmsg("out of background worker slots"), errmsg("Replication slot-sync worker failed to attach to worker-pool slot %d", case? errmsg("Removed database %d from replication slot-sync worker %d; dbcount now: %d", case? errmsg("Skipping slots synchronization as primary_slot_name is not set.")); case? errmsg("Skipping slots synchronization as hot_standby_feedback is off.")); case? errmsg("Skipping slots synchronization as dbname is not specified in primary_conninfo.")); case? errmsg("slot-sync wait for slot %s interrupted by promotion, slot creation aborted", errmsg("could not fetch slot info for slot \"%s\" from primary: %s", /primary/the primary/ errmsg("slot \"%s\" disappeared from the primary, aborting slot creation", errmsg("slot \"%s\" invalidated on primary, aborting slot creation", errmsg("slot-sync for slot %s interrupted by promotion, sync not possible", slot name not quoted? errmsg("skipping sync of slot \"%s\" as the received slot-sync lsn %X/%X is ahead of the standby position %X/%X", errmsg("not synchronizing slot %s; synchronization would move it backward", slot name not quoted? /backward/backwards/ --- LOG errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d", errmsg("Stopping replication slot-sync worker %d", errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)", errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)and catalog xmin (%u) has now passed local slot LSN (%X/%X) and catalog xmin (%u)", missing spaces? elog(LOG, "Dropped replication slot \"%s\" ", extra space? why this one is elog but others are not? elog(LOG, "Replication slot-sync worker %d is shutting down on receiving SIGINT", MySlotSyncWorker->slot); case? why this one is elog but others are not? elog(LOG, "Replication slot-sync worker %d started", worker_slot); case? why this one is elog but others are not? ---- DEBUG1 errmsg("allocated dsa for slot-sync worker for dbcount: %d" worker number not given? should be elog? errmsg_internal("logical replication launcher started") should be elog? ---- DEBUG2 elog(DEBUG2, "slot-sync worker%d's query:%s \n", missing space after 'worker' extra space before \n ====== .../libpqwalreceiver/libpqwalreceiver.c 2. libpqrcv_get_dbname_from_conninfo +/* + * Get database name from primary conninfo. + * + * If dbanme is not found in connInfo, return NULL value. + * The caller should take care of handling NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) 2a. /dbanme/dbname/ ~ 2b. "The caller should take care of handling NULL value." IMO this is not very useful; it's like saying "caller must handle function return values". ~~~ 3. + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* Ignore connection options that are not present. */ + if (opt->val == NULL) + continue; + + if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0') + { + dbname = pstrdup(opt->val); + } + } 3a. If there are multiple "dbname" in the conninfo then it will be the LAST one that is returned. Judging by my quick syntax experiment (below) this seemed like the correct thing to do, but I think there should be some comment to explain about it. test_sub=# create subscription sub1 connection 'dbname=foo dbname=bar dbname=test_pub' publication pub1; 2023-09-28 19:15:15.012 AEST [23997] WARNING: subscriptions created by regression test cases should have names starting with "regress_" WARNING: subscriptions created by regression test cases should have names starting with "regress_" NOTICE: created replication slot "sub1" on publisher CREATE SUBSCRIPTION ~ 3b. The block brackets {} are not needed for the single statement. ~ 3c. Since there is only one keyword of interest here it seemed overkill to have a separate 'continue' check. Why not do everything in one line: for (opt = opts; opt->keyword != NULL; ++opt) { if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0') dbname = pstrdup(opt->val); } ====== src/backend/replication/logical/launcher.c 4. +/* + * The local variables to store the current values of slot-sync related GUCs + * before each ConfigReload. + */ +static char *PrimaryConnInfoPreReload = NULL; +static char *PrimarySlotNamePreReload = NULL; +static char *SyncSlotNamesPreReload = NULL; /The local variables/Local variables/ ~~~ 5. fwd declare static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void slotsync_worker_cleanup(SlotSyncWorker *worker); static int logicalrep_pa_worker_count(Oid subid); 5a. Hmmn, I think there were lot more added static functions than just this one. e.g. what about all these? static SlotSyncWorker *slotsync_worker_find static dsa_handle slotsync_dsa_setup static bool slotsync_worker_launch_or_reuse static void slotsync_worker_stop_internal static void slotsync_workers_stop static void slotsync_remove_obsolete_dbs static WalReceiverConn *primary_connect static void SaveCurrentSlotSyncConfigs static bool SlotSyncConfigsChanged static void ApplyLauncherStartSlotSync static void ApplyLauncherStartSubs ~ 5b. There are inconsistent name style used for the new static functions -- e.g. snake_case versus CamelCase. ~~~ 6. WaitForReplicationWorkerAttach int rc; + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; This seemed a hacky way to distinguish the sync-slot workers from other kinds of workers. Wouldn't it be better to pass another parameter to this function? ~~~ 7. slotsync_worker_attach It looks like almost a clone of the logicalrep_worker_attach. Seems a shame if cannot make use of common code. ~~~ 8. slotsync_worker_find + * Walks the slot-sync workers pool and searches for one that matches given + * dbid. Since one worker can manage multiple dbs, so it walks the db array in + * each worker to find the match. 8a. SUGGESTION Searches the slot-sync worker pool for the worker who manages the specified dbid. Because a worker can manage multiple dbs, also walk the db array of each worker to find the match. ~ 8b. Should the comment also say something like "Returns NULL if no matching worker is found." ~~~ 9. + /* Search for attached worker for a given dbid */ SUGGESTION Search for an attached worker managing the given dbid. ~~~ 10. +{ + int i; + SlotSyncWorker *res = NULL; + Oid *dbids; + + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + /* Search for attached worker for a given dbid */ + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + int cnt; + + if (!w->hdr.in_use) + continue; + + dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); + for (cnt = 0; cnt < w->dbcount; cnt++) + { + Oid wdbid = dbids[cnt]; + + if (wdbid == dbid) + { + res = w; + break; + } + } + + /* If worker is found, break the outer loop */ + if (res) + break; + } + + return res; +} IMO this logical can be simplified a lot: - by not using the 'res' variable; directly return instead. - also moved the 'dbids' declaration. - and 'cnt' variable seems not meaningful; replace with 'dbidx' for the db array index IMO. For example (25 lines instead of 35 lines) { int i; Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); /* Search for an attached worker managing the given dbid. */ for (i = 0; i < max_slotsync_workers; i++) { SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; int dbidx; Oid *dbids; if (!w->hdr.in_use) continue; dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp); for (dbidx = 0; dbidx < w->dbcount; dbidx++) { if (dbids[dbidx] == dbid) return w; } } return NULL; } ~~~ 11. slot_sync_dsa_setup +/* + * Setup DSA for slot-sync worker. + * + * DSA is needed for dbids array. Since max number of dbs a worker can manage + * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT + * dbs is allocated. If this size is exhausted, it can be extended using + * dsa free and allocate routines. + */ +static dsa_handle +slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count) 11a. SUGGESTION DSA is used for the dbids array. Because the maximum number of dbs a worker can manage is not known, initially enough memory for DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted, it can be extended using dsa free and allocate routines. ~ 11b. It doesn't make sense for the comment to say DB_PER_WORKER_ALLOC_INIT is the initial allocation, but then the function has a parameter 'alloc_db_count' (which is always passed as DB_PER_WORKER_ALLOC_INIT). IMO revemo the 2nd parameter from this function and hardwire the initial allocation same as what the function comment says. ~~~ 12. + /* Be sure any memory allocated by DSA routines is persistent. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); /Be sure any memory/Ensure the memory/ ~~~ 13. slotsync_worker_launch_or_reuse +/* + * Slot-sync worker launch or reuse + * + * Start new slot-sync background worker from the pool of available workers + * going by max_slotsync_workers count. If the worker pool is exhausted, + * reuse the existing worker with minimum number of dbs. The idea is to + * always distribute the dbs equally among launched workers. + * If initially allocated dbids array is exhausted for the selected worker, + * reallocate the dbids array with increased size and copy the existing + * dbids to it and assign the new one as well. + * + * Returns true on success, false on failure. + */ /going by/limited by/ (??) ~~~ 14. + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + SlotSyncWorker *worker = NULL; + uint32 mindbcnt = 0; + uint32 alloc_count = 0; + uint32 copied_dbcnt = 0; + Oid *copied_dbids = NULL; + int worker_slot = -1; + dsa_handle handle; + Oid *dbids; + int i; + bool attach; IIUC many of these variables can be declared at a different scope in this function, so they will be closer to where they are used. ~~~ 15. + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); The current comment seems too much. SUGGESTION The shared memory must only be modified under lock. ~~~ 16. + /* Find unused worker slot. */ + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (!w->hdr.in_use) + { + worker = w; + worker_slot = i; + break; + } + } + + /* + * If all the workers are currently in use. Find the one with minimum + * number of dbs and use that. + */ + if (!worker) + { + for (i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; + + if (i == 0) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + else if (w->dbcount < mindbcnt) + { + mindbcnt = w->dbcount; + worker = w; + worker_slot = i; + } + } + } Why not combine these 2 loops, to avoid iterating over the same slots twice? Then, exit the loop immediately if unused worker found, otherwise if reach the end of loop having not found anything unused then you will already know the one having least dbs. ~~~ 17. + /* Remember the old dbids before we reallocate dsa. */ + copied_dbcnt = worker->dbcount; + copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid)); + memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid)); 17a. Who frees this copied_dbids memory when you are finished needed it. It seems allocated in the TopMemoryContext so IIUC this is a leak. ~ 17b. These are the 'old' values. Not the 'copied' values. The copied_xxx variable names seem misleading. ~~~ 18. + /* Prepare the new worker. */ + worker->hdr.launch_time = GetCurrentTimestamp(); + worker->hdr.in_use = true; If a new worker is required then the launch_time is set like above. + { + slot_db_data->last_launch_time = now; + + slotsync_worker_launch_or_reuse(slot_db_data->database); + } Meanwhile, at the caller of slotsync_worker_launch_or_reuse(), the dbid launch_time was already set as well. And those two timestamps are almost (but not quite) the same value. Isn't that a bit strange? ~~~ 19. + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ + handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT); + dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp); + + dbids[worker->dbcount++] = dbid; Where was this worker->dbcount assigned to 0? Maybe it's better to do this explicity under the "/* Prepare the new worker. */" comment. ~~~ 20. + if (!attach) + ereport(WARNING, + (errmsg("Replication slot-sync worker failed to attach to " + "worker-pool slot %d", worker_slot))); + + /* Attach is done, now safe to log that the worker is managing dbid */ + if (attach) + ereport(LOG, + (errmsg("Added database %d to replication slot-sync " + "worker %d; dbcount now: %d", + dbid, worker_slot, worker->dbcount))); 20a. IMO this should be coded as "if (attach) ...; else ..." ~ 99b. In other code if it failed to register then slotsync_worker_cleanup code is called. How come similar code is not done when fails to attach? ~~~ 21. slotsync_worker_stop_internal +/* + * Internal function to stop the slot-sync worker and wait until it detaches + * from the slot-sync worker-pool slot. + */ +static void +slotsync_worker_stop_internal(SlotSyncWorker *worker) IIUC this function does a bit more than what the function comment says. IIUC (again) I think the "detached" worker slot will still be flagged as 'inUse' but this function then does the extra step of calling slotsync_worker_cleanup() function to make the worker slot available for next process that needs it, am I correct? In this regard, this function seems a lot more like logicalrep_worker_detach() function comment, so there seems some kind of muddling of the different function names here... (??). ~~~ 22. slotsync_remove_obsolete_dbs This function says: +/* + * Slot-sync workers remove obsolete DBs from db-list + * + * If the DBIds fetched from the primary are lesser than the ones being managed + * by slot-sync workers, remove extra dbs from worker's db-list. This may happen + * if some slots are removed on primary but 'synchronize_slot_names' has not + * been changed yet. + */ +static void +slotsync_remove_obsolete_dbs(List *remote_dbs) But, there was another similar logic function too: +/* + * Drop obsolete slots + * + * Drop the slots which no longer need to be synced i.e. these either + * do not exist on primary or are no longer part of synchronize_slot_names. + * + * Also drop the slots which are valid on primary and got invalidated + * on standby due to conflict (say required rows removed on primary). + * The assumption is, these will get recreated in next sync-cycle and + * it is okay to drop and recreate such slots as long as these are not + * consumable on standby (which is the case currently). + */ +static void +drop_obsolete_slots(Oid *dbids, List *remote_slot_list) Those function header comments suggest these have a lot of overlapping functionality. Can't those 2 functions be combined? Or maybe one delegate to the other? ~~~ 23. + ListCell *lc; + Oid *dbids; + int widx; + int dbidx; + int i; Scope of some of these variable declarations can be different so they are declared closer to where they are used. ~~~ 24. + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + for (i = dbidx; i < worker->dbcount; i++) + { + /* Shift the DBs and get rid of wdbid */ + if (i < (worker->dbcount - 1)) + dbids[i] = dbids[i + 1]; + } IIUC, that shift/loop could just have been a memmove() call to remove one Oid element. ~~~ 25. + /* If dbcount for any worker has become 0, shut it down */ + for (widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; + + if (worker->hdr.in_use && !worker->dbcount) + slotsync_worker_stop_internal(worker); + } Is it safe to stop this unguarded by SlotSyncWorkerLock locking? Is there a window where another dbid decides to reuse this worker at the same time this process is about to stop it? ~~~ 26. primary_connect +/* + * Connect to primary server for slotsync purpose and return the connection + * info. Disconnect previous connection if provided in wrconn_prev. + */ /primary server/the primary server/ ~~~ 27. + if (!RecoveryInProgress()) + return NULL; + + if (max_slotsync_workers == 0) + return NULL; + + if (strcmp(synchronize_slot_names, "") == 0) + return NULL; + + /* The primary_slot_name is not set */ + if (!WalRcv || WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("Skipping slots synchronization as primary_slot_name " + "is not set.")); + return NULL; + } + + /* The hot_standby_feedback must be ON for slot-sync to work */ + if (!hot_standby_feedback) + { + ereport(WARNING, + errmsg("Skipping slots synchronization as hot_standby_feedback " + "is off.")); + return NULL; + } How come some of these checks giving WARNING that slot synchronization will be skipped, but others are just silently returning NULL? ~~~ 28. SaveCurrentSlotSyncConfigs +static void +SaveCurrentSlotSyncConfigs() +{ + PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo); + PrimarySlotNamePreReload = pstrdup(WalRcv->slotname); + SyncSlotNamesPreReload = pstrdup(synchronize_slot_names); +} Shouldn't this code also do pfree first? Otherwise these will slowly leak every time this function is called, right? ~~~ 29. SlotSyncConfigsChanged +static bool +SlotSyncConfigsChanged() +{ + if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) + return true; + + if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0) + return true; + + if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0) + return true; I felt those can all be combined to have 1 return instead of 3. ~~~ 30. + /* + * If we have reached this stage, it means original value of + * hot_standby_feedback was 'true', so consider it changed if 'false' now. + */ + if (!hot_standby_feedback) + return true; "If we have reached this stage" seems a bit vague. Can this have some more explanation? And, maybe also an Assert(hot_standby_feedback); is helpful in the calling code (before the config is reloaded)? ~~~ 31. ApplyLauncherStartSlotSync + * It connects to primary, get the list of DBIDs for slots configured in + * synchronize_slot_names. It then launces the slot-sync workers as per + * max_slotsync_workers and then assign the DBs equally to the workers + * launched. + */ SUGGESTION (fix typos etc) Connect to the primary, to get the list of DBIDs for slots configured in synchronize_slot_names. Then launch slot-sync workers (limited by max_slotsync_workers) where the DBs are distributed equally among those workers. ~~~ 32. +static void +ApplyLauncherStartSlotSync(long *wait_time, WalReceiverConn *wrconn) Why does this function even have 'Apply' in the name when it is nothing to do with an apply worker; looks like some cut/paste hangover. How about calling it something like 'LaunchSlotSyncWorkers' ~~~ 33. + /* If connection is NULL due to lack of correct configurations, return */ + if (!wrconn) + return; IMO it would be better to Assert wrconn in this function. If it is NULL then it should be checked a the caller, otherwise it just raises more questions -- like "who logged the warning about bad configuration" etc (which I already questions the NULL returns of primary_connect. ~~~ 34. + if (!OidIsValid(slot_db_data->database)) + continue; This represents some kind of integrity error doesn't it? Is it really OK just to silently skip such a thing? ~~~ 35. + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. + */ 35a. I found the "we" part of "so that we'll wake up..." to be a bit misleading. There is no waiting in this function; that wait value is handed back to the caller to deal with. TBH, I did not really understand why it is even necessary tp separate the waiting calculation *per-worker* like this. It seems to overcomplicate things and it might even give results like 1st worker is not started but last works is started (if enough time elapsed in the loop). Why can't all this wait logic be done one time up front, and either (a) start all necessary workers, or (b) start none of them and wait a bit longer. ~ 35b. "Each apply worker". Why is this talking about "apply" workers? Maybe cut/paste error? ~~~ 36. + last_launch_tried = slot_db_data->last_launch_time; + now = GetCurrentTimestamp(); + if (last_launch_tried == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_launch_tried, now)) >= + wal_retrieve_retry_interval) + { + slot_db_data->last_launch_time = now; + + slotsync_worker_launch_or_reuse(slot_db_data->database); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } 36a. IMO this might be simpler if you add another variable like bool 'launch_now': last_launch_tried = ... now = ... elapsed = ... launch_now = elapsed >= wal_retrieve_retry_interval; ~ 36b. Do you really care about checking "last_launch_tried == 0"; If it really is zero, then I thought the elapsed check should be enough. ~ 36c. Does this 'last_launch_time' really need to be in some shared memory? Won't a static variable suffice? ~~~ 37. ApplyLauncherStartSubs Wouldn't a better name for the function be something like 'LaunchSubscriptionApplyWorker'? (it is a better match for the suggested LaunchSlotSyncWorkers) ~~~ 38. ApplyLauncherMain Now that this is not only for Apply worker but also for SlotSync workers, maybe this function should be renamed as just LauncherMain, or something equally generic? ~~~ 39. + load_file("libpqwalreceiver", false); + + wrconn = primary_connect(NULL); + This connection did not exist in the HEAD code so I think it is added only for the slot-sync logic. IIUC it is still doing nothing for the non-slot-sync cases because primary_connect will silently return in that case: + if (!RecoveryInProgress()) + return NULL; IMO this is too sneaky, and it is misleading to see the normal apply worker launch apparently ccnnecting to something when it is not really doing so AFAIK. I think these conditions should be done explicity here at the caller to remove any such ambiguity. ~~~ 40. + if (!RecoveryInProgress()) + ApplyLauncherStartSubs(&wait_time); + else + ApplyLauncherStartSlotSync(&wait_time, wrconn); 40a. IMO this is deserving of a comment to explain why RecoveryInProgress means to perform the slot-synchronization. ~ 40b. Also, better to have positive check RecoveryInProgress() instead of !RecoveryInProgress() ~~~ 41. if (ConfigReloadPending) { + bool ssConfigChanged = false; + + SaveCurrentSlotSyncConfigs(); + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * Stop the slot-sync workers if any of the related GUCs changed. + * These will be relaunched as per the new values during next + * sync-cycle. + */ + ssConfigChanged = SlotSyncConfigsChanged(); + if (ssConfigChanged) + slotsync_workers_stop(); + + /* Reconnect in case primary_conninfo has changed */ + wrconn = primary_connect(wrconn); } } ~ 41a. The 'ssConfigChanged' assignement at declaration is not needed. Indeed, the whole variable is not really necessary because it is used only once. ~ 41b. /as per the new values/using the new values/ ~ 41c. + /* Reconnect in case primary_conninfo has changed */ + wrconn = primary_connect(wrconn); To avoid unnecessary reconnections, shouldn't this be done only if (ssConfigChanged). In fact, assuming the comment is correct, reconnect only if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) ====== src/backend/replication/logical/slotsync.c 42. wait_for_primary_slot_catchup + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin" + " (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); AFAIK it is usual for the LSN format string to be %X/%X (not %u/%X like here). ~~~ 43. + appendStringInfo(&cmd, + "SELECT restart_lsn, confirmed_flush_lsn, catalog_xmin" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); double space before FROM? ~~~ 44. synchronize_one_slot + /* + * We might not have the WALs retained locally corresponding to + * remote's restart_lsn if our local restart_lsn and/or local + * catalog_xmin is ahead of remote's one. And thus we can not create + * the local slot in sync with primary as that would mean moving local + * slot backward. Thus wait for primary's restart_lsn and catalog_xmin + * to catch up with the local ones and then do the sync. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot)) + { + /* + * The remote slot didn't catch up to locally reserved + * position + */ + ReplicationSlotRelease(); + CommitTransactionCommand(); + return; + } SUGGESTION (comment is slightly simplified) If the local restart_lsn and/or local catalog_xmin is ahead of those on the remote then we cannot create the local slot in sync with primary because that would mean moving local slot backwards. In this case we will wait for primary's restart_lsn and catalog_xmin to catch up with the local one before attempting the sync. ====== Kind Regards, Peter Smith. Fujitsu Australia