From da405938e1f9040f241478fa03a9f9db4ebc615a Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 10 Aug 2023 11:39:12 +1000
Subject: [PATCH v5] Add LogicalRepWorkerType enum.

Current HEAD code deduces a LogicalRepWorker's type from the values of several
different fields ('relid' and 'leader_pid') whenever logic needs to know it.

In fact, the logical replication worker type is already known at the time of
launching the LogicalRepWorker and it never changes for the lifetime of that
process. Instead of deducing the type, it is simpler to just store it one time,
and access it directly thereafter.

This patch adds a new enum LogicalRepWorkerType. A 'type' field is added to
LogicalRepWorker. This field is assigned when the worker is launched.
---
 .../replication/logical/applyparallelworker.c      |  3 ++-
 src/backend/replication/logical/launcher.c         | 25 ++++++++++++++++------
 src/backend/replication/logical/tablesync.c        |  3 ++-
 src/include/replication/worker_internal.h          | 23 +++++++++++++++-----
 src/tools/pgindent/typedefs.list                   |  1 +
 5 files changed, 41 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c..4e8ee29 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..2a1602c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
 
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	/*
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_worker == OidIsValid(relid));
+	Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_worker)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775..67bdd14 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a711..ce44776 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,20 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isLeaderApplyWorker(worker) ((worker)->type == WORKERTYPE_APPLY)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
 
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return isTablesyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return isLeaderApplyWorker(MyLogicalRepWorker);
 }
 
 static inline bool
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc..52a8789 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

