Here are some review comments for v24-0001 ====== 1. GENERAL - failover slots terminology
There is inconsistent terminology, such as below. Try to use the same wording everywhere. - failover logical slots - failover slots - logical failover slots - logical replication failover slots - etc. These are in many places - comments, function names, constants etc. ~~~ 2. GENERAL - THE s/primary.../the primary.../ s/standby.../the standby.../ Missing "the" problems remain in multiple places in the patch. ~~~ 3. GENERAL - messages I searched all the ereports and elogs (the full list is below only for reference). There are many little quirks: 3a. Sometimes messages say "primary"; sometimes "primary server" etc. Be consistent. 3b. /primary/the primary/ 3c. Sometimes messages include errcode and sometimes they do not; Are they deliberate or are there missing errcodes? 3d. At least one message has unwanted trailing space 3e. Sometimes using errcode and/or errmsg enclosed in parentheses; sometimes not. AFAIK it is not necessary anymore. 3f. Inconsistent terminology "slot" V "failover slots" V "failover logical slots" etc mentioned in the previous review comment #1 3g. Sometimes messages "slot creation aborted"; Sometimes "aborting slot creation". Be consistent. 3h. s/lsn/LSN/ 3i. s/move it backward/move it backwards/ 3j. Sometimes LOG message starts uppercase; Sometimes lowercase. Be consistent. 3k. typo: s/and and/and/ 3l. "worker %d" V "worker%d" ~ Messages: ereport(ERROR, (errmsg("could not receive failover slots dbinfo from the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); ereport(ERROR, (errmsg("invalid response from primary server"), errdetail("Could not get failover slots dbinfo: got %d fields, " "expected 1", nfields))); ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid connection string syntax: %s", errcopy))); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot-sync worker slot %d is " "empty, cannot attach", slot))); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot-sync worker slot %d is " "already used by another worker, cannot attach", slot))); ereport(ERROR, (errmsg("could not connect to the primary server: %s", err))); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use replication slot \"%s\" for logical decoding", NameStr(slot->data.name)), errdetail("This slot is being synced from the primary."), errhint("Specify another replication slot."))); ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\" from" " the primary: %s", remote_slot->name, res->err))); ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\" from" " the primary: %s", remote_slot->name, res->err))); ereport(ERROR, (errmsg("could not fetch invalidation cause for slot \"%s\" from" " primary: %s", slot_name, res->err))); ereport(ERROR, (errmsg("slot \"%s\" disappeared from the primary", slot_name))); ereport(ERROR, (errmsg("could not fetch failover logical slots info from the primary: %s", res->err))); ereport(ERROR, (errmsg("could not connect to the primary server: %s", err))); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not map dynamic shared memory " "segment for slot-sync worker"))); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot drop replication slot \"%s\"", name), errdetail("This slot is being synced from the primary."))); ereport(ERROR, (errmsg("could not receive failover slots dbinfo from the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); ereport(ERROR, (errmsg("invalid response from primary server"), errdetail("Could not get failover slots dbinfo: got %d fields, " "expected 1", nfields))); ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid connection string syntax: %s", errcopy))); ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), errhint("You might need to increase %s.", "max_worker_processes"))); ereport(WARNING, (errmsg("replication slot-sync worker failed to attach to " "worker-pool slot %d", worker_slot))); ereport(WARNING, errmsg("skipping slots synchronization as primary_slot_name " "is not set.")); ereport(WARNING, errmsg("skipping slots synchronization as hot_standby_feedback " "is off.")); ereport(WARNING, errmsg("skipping slots synchronization as dbname is not " "specified in primary_conninfo.")); ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot creation aborted", remote_slot->name))); ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot creation aborted", remote_slot->name))); ereport(WARNING, (errmsg("slot \"%s\" disappeared from the primary, aborting" " slot creation", remote_slot->name))); ereport(WARNING, (errmsg("slot \"%s\" invalidated on primary, aborting" " slot creation", remote_slot->name))); ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("slot-sync for slot \"%s\" interrupted by promotion, " "sync not possible", remote_slot->name))); ereport(WARNING, errmsg("skipping sync of slot \"%s\" as the received slot-sync " "lsn %X/%X is ahead of the standby position %X/%X", remote_slot->name, LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), LSN_FORMAT_ARGS(WalRcv->latestWalEnd))); ereport(WARNING, errmsg("not synchronizing slot %s; synchronization would move" " it backward", remote_slot->name)); ereport(LOG, (errmsg("Dropped replication slot \"%s\" ", NameStr(local_slot->data.name)))); ereport(LOG, (errmsg("Added database %d to replication slot-sync " "worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount))); ereport(LOG, (errmsg("Added database %d to replication slot-sync " "worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount))); ereport(LOG, (errmsg("Stopping replication slot-sync worker %d", slot))); ereport(LOG, (errmsg("removed database %d from replication slot-sync " "worker %d; dbcount now: %d", wdbid, worker->slot, worker->dbcount))); ereport(LOG, errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" " (%u) to pass local slot LSN (%X/%X) and and catalog xmin (%u)", remote_slot->name, LSN_FORMAT_ARGS(remote_slot->restart_lsn), remote_slot->catalog_xmin, LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), MyReplicationSlot->data.catalog_xmin)); ereport(LOG, errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" " and catalog xmin (%u) has now passed local slot LSN" " (%X/%X) and catalog xmin (%u)", remote_slot->name, LSN_FORMAT_ARGS(new_restart_lsn), new_catalog_xmin, LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), MyReplicationSlot->data.catalog_xmin)); ereport(LOG, errmsg("Replication slot-sync worker %d is shutting" " down on receiving SIGINT", MySlotSyncWorker->slot)); ereport(LOG, errmsg("Replication slot-sync worker %d started", worker_slot)); elog(DEBUG1, "allocated dsa for slot-sync worker for dbcount: %d", DB_PER_WORKER_ALLOC_INIT); elog(DEBUG1, "logical replication launcher started"); elog(DEBUG2, "slot-sync worker%d's query:%s \n", MySlotSyncWorker->slot, s.data); ~~~ 4. GENERAL - SlotSyncWorker loops When iterating slot-sync workers the code sometimes looks like + for (int i = 0; i < max_slotsync_workers; i++) + { + SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i]; and other times it looks like + for (int widx = 0; widx < max_slotsync_workers; widx++) + { + SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx]; etc. It would be better if such loops would use the same loop variable and SlotSyncWorker variable names; consistency will make the code easier to read. ====== Commit message 5. GUC 'enable_syncslot' enables a physical_satndby to synchronize logical replication failover slots from the primary server. s/physical_satndby/physical standby/ ## I think this one is already fixed in the latest v25. ~~~ 6. The logical slots created by slot-sync workers on physical standbys are not allowed to be consumed and dropped. Any attempt to perform logical decoding on such slots will result in an error. ~ SUGGESTION The logical slots created by slot-sync workers on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. ====== doc/src/sgml/config.sgml 7. + <para> + Specify dbname in <varname>primary_conninfo</varname> string + to allow synchronization of slots from the primary to standby. + This will only be used for slot synchronization. It is ignored + for streaming. </para> Maybe better to use <literal> for dbname. ~~~ 8. + </varlistentry> + + </variablelist> Extra blank link not needed. ====== .../libpqwalreceiver/libpqwalreceiver.c 9. libpqrcv_get_dbname_from_conninfo + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* If multiple dbnames are used, then the last one will be returned */ s/are used/are specified/ ====== src/backend/replication/logical/launcher.c 10. slotsync_worker_launch_or_reuse + MemoryContext oldcontext; + uint32 alloc_count = 0; + uint32 old_dbcnt = 0; + Oid *old_dbids = NULL; No need to assign these in the declaration, because they get unconditionally assigned before they are inspected anyhow. ~~~ 11. + /* Prepare the new worker. */ + worker->hdr.launch_time = GetCurrentTimestamp(); + worker->hdr.in_use = true; + + /* + * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we + * attach this worker to a particular worker-pool slot + */ + worker->hdr.proc = NULL; + worker->slot = -1; + + /* TODO: do we really need 'generation', analyse more here */ + worker->hdr.generation++; + + /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */ + handle = slotsync_dsa_setup(worker); It is confusing for some of the worker members to be initialized here and other worker members (like `dbcount`) to be initialized within the function slotsync_dsa_setup(). It might be better if all the field initialization can be kept together -- e.g. combined in a new function 'slotsync_worker_setup()'. ~~~ 12. + /* Check if current DB is still present in remote-db-list */ + foreach(lc, remote_dbs) + { + WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc); + + if (failover_slot_data->dboid == wdbid) + { + found = true; + break; + } + } + + /* If not found, then delete this db from worker's db-list */ + if (!found) + { + if (dbidx < (worker->dbcount - 1)) + { + /* Shift the DBs and get rid of wdbid */ + memmove(&dbids[dbidx], &dbids[dbidx + 1], + (worker->dbcount - dbidx - 1) * sizeof(Oid)); + } + + worker->dbcount--; + + ereport(LOG, + (errmsg("removed database %d from replication slot-sync " + "worker %d; dbcount now: %d", + wdbid, worker->slot, worker->dbcount))); + } + + /* Else move to next db-position */ + else + { + dbidx++; + } This code might be simpler if you just remove the whole "Else move..." part and instead just increment the `dbidx` at the same time you set found = true;s/ For example, if (failover_slot_data->dboid == wdbid) { /* advance worker to next db-position */ found = true; dbidxid++; break; } ~~~ 13. slotsync_remote_connect +/* + * Connect to the primary server for slotsync purpose and return the connection + * info. + */ +static WalReceiverConn * +slotsync_remote_connect() +{ + WalReceiverConn *wrconn = NULL; + char *err; + char *dbname; No need to assign NULL there. It will be overwritten before it is used. ~~~ 14. Ajins's previous explanation ([1] #27) of why some of the checks have warnings and some do not was helpful; IMO this should be written as a comment in this function. + /* The primary_slot_name is not set */ + if (!WalRcv || WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("skipping slots synchronization as primary_slot_name " + "is not set.")); + return NULL; + } + + /* The hot_standby_feedback must be ON for slot-sync to work */ + if (!hot_standby_feedback) + { + ereport(WARNING, + errmsg("skipping slots synchronization as hot_standby_feedback " + "is off.")); + return NULL; + } + + /* The dbname must be specified in primary_conninfo for slot-sync to work */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + { + ereport(WARNING, + errmsg("skipping slots synchronization as dbname is not " + "specified in primary_conninfo.")); + return NULL; + } Add a new comment above all those: SUGGESTION /* * Check that other GUC settings (primary_slot_name, hot_standby_feedback, primary_conninfo) * are compatible with slot synchronization. */ ~~~ 15. slotsync_configs_changed +static bool +slotsync_configs_changed() +{ + if ((EnableSyncSlotPreReload != enable_syncslot) || + (HotStandbyFeedbackPreReload != hot_standby_feedback) || + (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) || + (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0)) + { + return true; + } + + return false; +} Might as well write this as a single return. Also, IMO it is more natural to write as "if the <now_value> is different to <prev_value>" instead of the other way around For example: return (enable_syncslot != EnableSyncSlotPreReload) || (hot_standby_feedback != HotStandbyFeedbackPreReload) || (strcmp(PrimaryConnInfo, PrimaryConnInfoPreReload) != 0) || (strcmp(WalRcv->slotname,PrimarySlotNamePreReload) != 0); ~~~ 16. slotsync_configs_changed + foreach(lc, slots_dbs) + { + WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc); + SlotSyncWorker *w; + + Assert(OidIsValid(failover_slot_data->dboid)); + + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + w = slotsync_worker_find(failover_slot_data->dboid); + LWLockRelease(SlotSyncWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If we failed to launch this slotsync worker, return and try + * launching the failed and remaining workers in next sync-cycle. But + * change launcher's wait time to minimum of + * wal_retrieve_retry_interval and default wait time to try next + * sync-cycle sooner. + */ + if (!slotsync_worker_launch_or_reuse(failover_slot_data->dboid)) + { + *wait_time = Min(*wait_time, wal_retrieve_retry_interval); + break; + } + } Nit: IMO when the variable scope is small (when you can easily see the declaration and every usage in a few lines) having such long descriptive makes the code *less* instead of more readable. SUGGESTION s/failover_slot_data/slot_data/ OR s/failover_slot_data/sdata/ ====== src/backend/replication/logical/slotsync.c 17. + * This file contains the code for slot-sync workers on physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. s/on physical standby/on the physical standby/ ~~~ 18. slot_exists_in_list + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * if remote slot is marked as non-conflicting (i.e. not + * invalidated) but local slot is marked as invalidated, then set + * the bool. + */ + if (!remote_slot->conflicting && + local_slot->data.invalidated != RS_INVAL_NONE) + *locally_invalidated = true; + + return true; + } Isn't it better to *always* set that 'locally_invalidated' flag for a found slot? Otherwise, you are assuming that the flag value was initially false, but maybe it was not. SUGGESTION /* * Is the remote slot is marked as non-conflicting (i.e. not * invalidated) when the local slot is marked as invalidated? */ *locally_invalidated = !remote_slot->conflicting && (local_slot->data.invalidated != RS_INVAL_NONE); ~~ 19. get_remote_invalidation_cause + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch invalidation cause for slot \"%s\" from" + " primary: %s", slot_name, res->err))); (already mentioned in general review comment) s/from primary/from the primary/ ~~~ 20. +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either + * do not exist on primary or are no longer enabled as failover slots. (??) s/enabled as failover slots/designated as failover slots/ OR s/enabled as failover slots/enabled for failover ~~~ 21. construct_slot_query +static void +construct_slot_query(StringInfo s, Oid *dbids) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED)); + + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, conflicting, " + " database FROM pg_catalog.pg_replication_slots" + " WHERE enable_failover=true and database IN "); /WHERE enable_failover=true and database IN/WHERE enable_failover AND database IN/ ### I noticed the code is a tiny bit different in v25, but the review comment is still relevant. ~~~ 22. synchronize_slots +/* + * Synchronize slots. + * + * It gets the failover logical slots info from the primary server for the dbids + * managed by this worker and then updates the slots locally as per the info + * received. It creates the slots if not present on the standby. + * + * It returns nap time for the next sync-cycle. + */ Comment can be re-worded to not say "it" everywhere. ====== src/backend/replication/walsender.c 23. + /* + * Check if the database OID is already in the list, and if so, skip + * this slot. + */ + if (list_member_oid(database_oids_list, dboid)) + continue; Simplify the comment SUGGESTION Skip this slot if the database OID is already in the list. ====== src/backend/utils/activity/wait_event_names.txt 24. +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot-sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for primary to catch-up, in slot-sync worker." (this was already mentioned in the general review comment) s/primary/the primary/ ====== src/include/postmaster/bgworker_internals.h 25. #define MAX_PARALLEL_WORKER_LIMIT 1024 +#define MAX_SLOTSYNC_WORKER_LIMIT 50 This constant seems to be not used anywhere except in guc_tables.c where the GUC is defined. IMO you should make use of this in some Assert or a message; Otherwise, might as well just remove it and hardwire the 50 in the guc_tables.c directly. ====== src/include/replication/walreceiver.h 26. WalRcvFailoverSlotsData +/* + * Failover logical slots dbids received from remote. + */ +typedef struct WalRcvFailoverSlotsData +{ + Oid dboid; +} WalRcvFailoverSlotsData; + For now, the only data is `dbids` but maybe one day there will be more stuff, so make the struct comment more generic. SUGGESTION Failover logical slots data received from remote. ====== src/include/replication/worker_internal.h 27. LogicalRepWorkerType + +typedef struct LogicalRepWorker +{ + LogicalWorkerHeader hdr; + + /* What type of worker is this? */ + LogicalRepWorkerType type; + Maybe add some struct-level comments for this. ====== [1] https://www.postgresql.org/message-id/CAFPTHDaqn%2Bm47_vkAToQD6Pe8diut0F0g0bSr8PdcuW6cbSSkQ%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia