Here are some review comments for v24-0001

1. GENERAL - failover slots terminology

There is inconsistent terminology, such as below. Try to use the same
wording everywhere.
- failover logical slots
- failover slots
- logical failover slots
- logical replication failover slots
- etc.

These are in many places - comments, function names, constants etc.



s/primary.../the primary.../
s/standby.../the standby.../

Missing "the" problems remain in multiple places in the patch.


3. GENERAL - messages

I searched all the ereports and elogs (the full list is below only for
reference). There are many little quirks:

3a. Sometimes messages say "primary"; sometimes "primary server" etc.
Be consistent.

3b. /primary/the primary/

3c. Sometimes messages include errcode and sometimes they do not; Are
they deliberate or are there missing errcodes?

3d. At least one message has unwanted trailing space

3e. Sometimes using errcode and/or errmsg enclosed in parentheses;
sometimes not. AFAIK it is not necessary anymore.

3f. Inconsistent terminology "slot" V "failover slots" V "failover
logical slots" etc mentioned in the previous review comment #1

3g. Sometimes messages "slot creation aborted"; Sometimes "aborting
slot creation". Be consistent.

3h. s/lsn/LSN/

3i. s/move it backward/move it backwards/

3j. Sometimes LOG message starts uppercase; Sometimes lowercase. Be consistent.

3k. typo: s/and and/and/

3l. "worker %d" V "worker%d"



ereport(ERROR, (errmsg("could not receive failover slots dbinfo from
the primary server: %s", pchomp(PQerrorMessage(conn->streamConn)))));
ereport(ERROR, (errmsg("invalid response from primary server"),
errdetail("Could not get failover slots dbinfo: got %d fields, "
"expected 1", nfields)));
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid
connection string syntax: %s", errcopy)));
errmsg("replication slot-sync worker slot %d is " "empty, cannot
attach", slot)));
errmsg("replication slot-sync worker slot %d is " "already used by
another worker, cannot attach", slot)));
ereport(ERROR, (errmsg("could not connect to the primary server: %s", err)));
errmsg("cannot use replication slot \"%s\" for logical decoding",
NameStr(slot->, errdetail("This slot is being synced  from
the primary."), errhint("Specify another replication slot.")));
ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\"
from" " the primary: %s", remote_slot->name, res->err)));
ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\"
from" " the primary: %s", remote_slot->name, res->err)));
ereport(ERROR, (errmsg("could not fetch invalidation cause for slot
\"%s\" from" " primary: %s", slot_name, res->err)));
ereport(ERROR, (errmsg("slot \"%s\" disappeared from the primary", slot_name)));
ereport(ERROR, (errmsg("could not fetch failover logical slots info
from the primary: %s", res->err)));
ereport(ERROR, (errmsg("could not connect to the primary server: %s", err)));
errmsg("could not map dynamic shared memory " "segment for slot-sync
errmsg("cannot drop replication slot \"%s\"", name), errdetail("This
slot is being synced from the primary.")));
ereport(ERROR, (errmsg("could not receive failover slots dbinfo from
the primary server: %s", pchomp(PQerrorMessage(conn->streamConn)))));
ereport(ERROR, (errmsg("invalid response from primary server"),
errdetail("Could not get failover slots dbinfo: got %d fields, "
"expected 1", nfields)));
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid
connection string syntax: %s", errcopy)));

errmsg("out of background worker slots"), errhint("You might need to
increase %s.", "max_worker_processes")));
ereport(WARNING, (errmsg("replication slot-sync worker failed to
attach to " "worker-pool slot %d", worker_slot)));
ereport(WARNING, errmsg("skipping slots synchronization as
primary_slot_name " "is not set."));
ereport(WARNING, errmsg("skipping slots synchronization as
hot_standby_feedback " "is off."));
ereport(WARNING, errmsg("skipping slots synchronization as dbname is
not " "specified in primary_conninfo."));
errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot
creation aborted", remote_slot->name)));
errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot
creation aborted", remote_slot->name)));
ereport(WARNING, (errmsg("slot \"%s\" disappeared from the primary,
aborting" " slot creation", remote_slot->name)));
ereport(WARNING, (errmsg("slot \"%s\" invalidated on primary,
aborting" " slot creation", remote_slot->name)));
errmsg("slot-sync for slot \"%s\" interrupted by promotion, " "sync
not possible", remote_slot->name)));
ereport(WARNING, errmsg("skipping sync of slot \"%s\" as the received
slot-sync " "lsn %X/%X is ahead of the standby position %X/%X",
remote_slot->name,    LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
ereport(WARNING, errmsg("not synchronizing slot %s; synchronization
would move" " it backward", remote_slot->name));

ereport(LOG, (errmsg("Dropped replication slot \"%s\" ",
ereport(LOG, (errmsg("Added database %d to replication slot-sync "
"worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount)));
ereport(LOG, (errmsg("Added database %d to replication slot-sync "
"worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount)));
ereport(LOG, (errmsg("Stopping replication slot-sync worker %d", slot)));
ereport(LOG, (errmsg("removed database %d from replication slot-sync "
"worker %d; dbcount now: %d", wdbid, worker->slot, worker->dbcount)));
ereport(LOG, errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and
catalog xmin" " (%u) to pass local slot LSN (%X/%X) and and catalog
xmin (%u)", remote_slot->name,
LSN_FORMAT_ARGS(remote_slot->restart_lsn), remote_slot->catalog_xmin,
ereport(LOG, errmsg("wait over for remote slot \"%s\" as its LSN
(%X/%X)" " and catalog xmin (%u) has now passed local slot LSN" "
(%X/%X) and catalog xmin (%u)",    remote_slot->name,
LSN_FORMAT_ARGS(new_restart_lsn), new_catalog_xmin,
ereport(LOG, errmsg("Replication slot-sync worker %d is shutting" "
down on receiving SIGINT", MySlotSyncWorker->slot));
ereport(LOG, errmsg("Replication slot-sync worker %d started", worker_slot));

elog(DEBUG1, "allocated dsa for slot-sync worker for dbcount: %d",
elog(DEBUG1, "logical replication launcher started"); elog(DEBUG2,
"slot-sync worker%d's query:%s \n", MySlotSyncWorker->slot,;


4. GENERAL - SlotSyncWorker loops

When iterating slot-sync workers the code sometimes looks like

+ for (int i = 0; i < max_slotsync_workers; i++)
+ {
+ SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];

and other times it looks like

+ for (int widx = 0; widx < max_slotsync_workers; widx++)
+ {
+ SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];


It would be better if such loops would use the same loop variable and
SlotSyncWorker variable names; consistency will make the code easier
to read.

Commit message

GUC 'enable_syncslot' enables a physical_satndby to synchronize logical
replication failover slots from the primary server.

s/physical_satndby/physical standby/

## I think this one is already fixed in the latest v25.


The logical slots created by slot-sync workers on physical standbys are
not allowed to be consumed and dropped. Any attempt to perform logical decoding
on such slots will result in an error.


The logical slots created by slot-sync workers on physical standbys are
not allowed to be dropped or consumed. Any attempt to perform logical decoding
on such slots will result in an error.


+         <para>
+          Specify dbname in <varname>primary_conninfo</varname> string
+          to allow synchronization of slots from the primary to standby.
+          This will only be used for slot synchronization. It is ignored
+          for streaming.

Maybe better to use <literal> for dbname.


+     </varlistentry>

Extra blank link not needed.


9. libpqrcv_get_dbname_from_conninfo

+ for (opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /* If multiple dbnames are used, then the last one will be returned */

s/are used/are specified/


10. slotsync_worker_launch_or_reuse

+ MemoryContext oldcontext;
+ uint32 alloc_count = 0;
+ uint32 old_dbcnt = 0;
+ Oid    *old_dbids = NULL;

No need to assign these in the declaration, because they get
unconditionally assigned before they are inspected anyhow.


+ /* Prepare the new worker. */
+ worker->hdr.launch_time = GetCurrentTimestamp();
+ worker->hdr.in_use = true;
+ /*
+ * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we
+ * attach this worker to a particular worker-pool slot
+ */
+ worker->hdr.proc = NULL;
+ worker->slot = -1;
+ /* TODO: do we really need 'generation', analyse more here */
+ worker->hdr.generation++;
+ /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */
+ handle = slotsync_dsa_setup(worker);

It is confusing for some of the worker members to be initialized here
and other worker members (like `dbcount`) to be initialized within the
function slotsync_dsa_setup(). It might be better if all the field
initialization can be kept together -- e.g. combined in a new function


+ /* Check if current DB is still present in remote-db-list */
+ foreach(lc, remote_dbs)
+ {
+ WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc);
+ if (failover_slot_data->dboid == wdbid)
+ {
+ found = true;
+ break;
+ }
+ }
+ /* If not found, then delete this db from worker's db-list */
+ if (!found)
+ {
+ if (dbidx < (worker->dbcount - 1))
+ {
+ /* Shift the DBs and get rid of wdbid */
+ memmove(&dbids[dbidx], &dbids[dbidx + 1],
+ (worker->dbcount - dbidx - 1) * sizeof(Oid));
+ }
+ worker->dbcount--;
+ ereport(LOG,
+ (errmsg("removed database %d from replication slot-sync "
+ "worker %d; dbcount now: %d",
+ wdbid, worker->slot, worker->dbcount)));
+ }
+ /* Else move to next db-position */
+ else
+ {
+ dbidx++;
+ }

This code might be simpler if you just remove the whole "Else move..."
part and instead just increment the `dbidx` at the same time you set
found = true;s/

For example,

if (failover_slot_data->dboid == wdbid)
/* advance worker to next db-position */
found = true;


13. slotsync_remote_connect

+ * Connect to the primary server for slotsync purpose and return the connection
+ * info.
+ */
+static WalReceiverConn *
+ WalReceiverConn *wrconn = NULL;
+ char    *err;
+ char    *dbname;

No need to assign NULL there. It will be overwritten before it is used.


Ajins's previous explanation ([1] #27) of why some of the checks have
warnings and some do not was helpful; IMO this should be written as a
comment in this function.

+ /* The primary_slot_name is not set */
+ if (!WalRcv || WalRcv->slotname[0] == '\0')
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as primary_slot_name "
+    "is not set."));
+ return NULL;
+ }
+ /* The hot_standby_feedback must be ON for slot-sync to work */
+ if (!hot_standby_feedback)
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as hot_standby_feedback "
+    "is off."));
+ return NULL;
+ }
+ /* The dbname must be specified in primary_conninfo for slot-sync to work */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as dbname is not "
+    "specified in primary_conninfo."));
+ return NULL;
+ }

Add a new comment above all those:

 * Check that other GUC settings (primary_slot_name,
hot_standby_feedback, primary_conninfo)
 * are compatible with slot synchronization.


15. slotsync_configs_changed

+static bool
+ if ((EnableSyncSlotPreReload != enable_syncslot) ||
+ (HotStandbyFeedbackPreReload != hot_standby_feedback) ||
+ (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) ||
+ (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0))
+ {
+ return true;
+ }
+ return false;

Might as well write this as a single return. Also, IMO it is more
natural to write as "if the <now_value> is different to <prev_value>"
instead of the other way around

For example:

  (enable_syncslot != EnableSyncSlotPreReload) ||
  (hot_standby_feedback != HotStandbyFeedbackPreReload) ||
  (strcmp(PrimaryConnInfo, PrimaryConnInfoPreReload) != 0) ||
  (strcmp(WalRcv->slotname,PrimarySlotNamePreReload) != 0);


16. slotsync_configs_changed

+ foreach(lc, slots_dbs)
+ {
+ WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc);
+ SlotSyncWorker *w;
+ Assert(OidIsValid(failover_slot_data->dboid));
+ LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+ w = slotsync_worker_find(failover_slot_data->dboid);
+ LWLockRelease(SlotSyncWorkerLock);
+ if (w != NULL)
+ continue; /* worker is running already */
+ /*
+ * If we failed to launch this slotsync worker, return and try
+ * launching the failed and remaining workers in next sync-cycle. But
+ * change launcher's wait time to minimum of
+ * wal_retrieve_retry_interval and default wait time to try next
+ * sync-cycle sooner.
+ */
+ if (!slotsync_worker_launch_or_reuse(failover_slot_data->dboid))
+ {
+ *wait_time = Min(*wait_time, wal_retrieve_retry_interval);
+ break;
+ }
+ }

Nit: IMO when the variable scope is small (when you can easily see the
declaration and every usage in a few lines) having such long
descriptive makes the code *less* instead of more readable.






+ * This file contains the code for slot-sync workers on physical standby
+ * to fetch logical failover slots information from the primary server,
+ * create the slots on the standby and synchronize them periodically.

s/on physical standby/on the physical standby/


18. slot_exists_in_list

+ if (strcmp(remote_slot->name, NameStr(local_slot-> == 0)
+ {
+ /*
+ * if remote slot is marked as non-conflicting (i.e. not
+ * invalidated) but local slot is marked as invalidated, then set
+ * the bool.
+ */
+ if (!remote_slot->conflicting &&
+ local_slot->data.invalidated != RS_INVAL_NONE)
+ *locally_invalidated = true;
+ return true;
+ }

Isn't it better to *always* set that 'locally_invalidated' flag for a
found slot? Otherwise, you are assuming that the flag value was
initially false, but maybe it was not.

 * Is the remote slot is marked as non-conflicting (i.e. not
 * invalidated) when the local slot is marked as invalidated?
*locally_invalidated =
  !remote_slot->conflicting &&
  (local_slot->data.invalidated != RS_INVAL_NONE);


19. get_remote_invalidation_cause

+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch invalidation cause for slot \"%s\" from"
+ " primary: %s", slot_name, res->err)));

(already mentioned in general review comment)

s/from primary/from the primary/


+ * Drop obsolete slots
+ *
+ * Drop the slots that no longer need to be synced i.e. these either
+ * do not exist on primary or are no longer enabled as failover slots.


s/enabled as failover slots/designated as failover slots/


s/enabled as failover slots/enabled for failover


21. construct_slot_query

+static void
+construct_slot_query(StringInfo s, Oid *dbids)
+ Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
+ appendStringInfo(s,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase, conflicting, "
+ " database FROM pg_catalog.pg_replication_slots"
+ " WHERE enable_failover=true and database IN ");

/WHERE enable_failover=true and database IN/WHERE enable_failover AND
database IN/

### I noticed the code is a tiny bit different in v25, but the review
comment is still relevant.


22. synchronize_slots

+ * Synchronize slots.
+ *
+ * It gets the failover logical slots info from the primary server
for the dbids
+ * managed by this worker and then updates the slots locally as per the info
+ * received. It creates the slots if not present on the standby.
+ *
+ * It returns nap time for the next sync-cycle.
+ */

Comment can be re-worded to not say "it" everywhere.


+ /*
+ * Check if the database OID is already in the list, and if so, skip
+ * this slot.
+ */
+ if (list_member_oid(database_oids_list, dboid))
+ continue;

Simplify the comment

Skip this slot if the database OID is already in the list.


+REPL_SLOTSYNC_MAIN "Waiting in main loop of slot-sync worker."
+REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for primary to catch-up, in
slot-sync worker."

(this was already mentioned in the general review comment)

s/primary/the primary/



This constant seems to be not used anywhere except in guc_tables.c
where the GUC is defined. IMO you should make use of this in some
Assert or a message; Otherwise, might as well just remove it and
hardwire the 50 in the guc_tables.c directly.


26. WalRcvFailoverSlotsData

+ * Failover logical slots dbids received from remote.
+ */
+typedef struct WalRcvFailoverSlotsData
+ Oid dboid;
+} WalRcvFailoverSlotsData;

For now, the only data is `dbids` but maybe one day there will be more
stuff, so make the struct comment more generic.

Failover logical slots data received from remote.


27. LogicalRepWorkerType
+typedef struct LogicalRepWorker
+ LogicalWorkerHeader hdr;
+ /* What type of worker is this? */
+ LogicalRepWorkerType type;

Maybe add some struct-level comments for this.


Kind Regards,
Peter Smith.
Fujitsu Australia

Reply via email to