From a8996e809ee8d4f9a381f3589169b1a5a1077b1b Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 8 Aug 2023 17:17:26 +1000
Subject: [PATCH v4] Add LogicalRepWorkerType enum

Current HEAD, deduces a LogicalRepWorker's type from different fields ('relid'
and 'leader_pid'). But, the logical replication worker type is already known at
the time of launching the LogicalRepWorker and it never changes for the lifetime
of that process. So, instead of deducing the type, it is simpler to just store
it one time, and access it directly thereafter.

This patch adds a new enum LogicalRepWorkerType. A "type" field is added to
LogicalRepWorker. This type field is assigned when the worker is launched. Inline
functions is_leader_apply_worker(), is_parallel_apply_worker(), is_tablesync worker()
now just directly return the specified worker type.
---
 .../replication/logical/applyparallelworker.c      | 15 ++++---
 src/backend/replication/logical/launcher.c         | 51 +++++++++++++---------
 src/backend/replication/logical/tablesync.c        |  9 ++--
 src/backend/replication/logical/worker.c           | 40 ++++++++---------
 src/include/replication/worker_internal.h          | 29 +++++++-----
 src/tools/pgindent/typedefs.list                   |  1 +
 6 files changed, 84 insertions(+), 61 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c..779aec5 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -265,7 +265,7 @@ static bool
 pa_can_start(void)
 {
 	/* Only leader apply workers can start parallel apply workers. */
-	if (!am_leader_apply_worker())
+	if (!is_leader_apply_worker(MyLogicalRepWorker))
 		return false;
 
 	/*
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
 		return NULL;
 	}
 
-	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+	launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+										MyLogicalRepWorker->dbid,
 										MySubscription->oid,
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
@@ -555,7 +556,7 @@ pa_find_worker(TransactionId xid)
 static void
 pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
-	Assert(!am_parallel_apply_worker());
+	Assert(!is_parallel_apply_worker(MyLogicalRepWorker));
 	Assert(winfo->in_use);
 	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
@@ -1506,7 +1507,7 @@ pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
 
 	if (fileset_state == FS_SERIALIZE_DONE)
 	{
-		Assert(am_leader_apply_worker());
+		Assert(is_leader_apply_worker(MyLogicalRepWorker));
 		Assert(MyLogicalRepWorker->stream_fileset);
 		wshared->fileset = *MyLogicalRepWorker->stream_fileset;
 	}
@@ -1522,7 +1523,7 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
-	Assert(am_parallel_apply_worker());
+	Assert(is_parallel_apply_worker(MyLogicalRepWorker));
 
 	SpinLockAcquire(&MyParallelShared->mutex);
 	fileset_state = MyParallelShared->fileset_state;
@@ -1593,7 +1594,7 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
 void
 pa_decr_and_wait_stream_block(void)
 {
-	Assert(am_parallel_apply_worker());
+	Assert(is_parallel_apply_worker(MyLogicalRepWorker));
 
 	/*
 	 * It is only possible to not have any pending stream chunks when we are
@@ -1620,7 +1621,7 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	Assert(is_leader_apply_worker(MyLogicalRepWorker));
 
 	/*
 	 * Unlock the shared object lock so that parallel apply worker can
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..7dc078a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -259,7 +259,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (is_parallel_apply_worker(w))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+						 Oid dbid, Oid subid, const char *subname, Oid userid,
 						 Oid relid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		is_tablesync_wkr = (wtype == WORKERTYPE_TABLESYNC);
+	bool		is_parallel_apply_wkr = (wtype == WORKERTYPE_PARALLEL_APPLY);
 
-	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	/*
+	 * Sanity checks:
+	 * - must be valid worker type
+	 * - tablesync workers are only ones to have relid
+	 * - parallel apply worker is the only kind of subworker
+	 */
+	Assert(wtype != WORKERTYPE_UNKNOWN);
+	Assert(is_tablesync_wkr == OidIsValid(relid));
+	Assert(is_parallel_apply_wkr == (subworker_dsm != DSM_HANDLE_INVALID));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (is_tablesync_wkr && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -405,7 +414,7 @@ retry:
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
 	 */
-	if (is_parallel_apply_worker &&
+	if (is_parallel_apply_wkr &&
 		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
@@ -427,6 +436,7 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	worker->type = wtype;
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -438,8 +448,8 @@ retry:
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
-	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
-	worker->parallel_apply = is_parallel_apply_worker;
+	worker->leader_pid = is_parallel_apply_wkr ? MyProcPid : InvalidPid;
+	worker->parallel_apply = is_parallel_apply_wkr;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -458,7 +468,7 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	if (is_parallel_apply_wkr)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -466,7 +476,7 @@ retry:
 				 subid);
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
 	}
-	else if (OidIsValid(relid))
+	else if (is_tablesync_wkr)
 	{
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -488,7 +498,7 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
+	if (is_parallel_apply_wkr)
 		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
 
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
@@ -607,7 +617,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!is_parallel_apply_worker(worker));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -649,7 +659,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = &LogicalRepCtx->workers[slot_no];
-	Assert(isParallelApplyWorker(worker));
+	Assert(is_parallel_apply_worker(worker));
 
 	/*
 	 * Only stop the worker if the generation matches and the worker is alive.
@@ -735,7 +745,7 @@ static void
 logicalrep_worker_detach(void)
 {
 	/* Stop the parallel apply workers. */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 	{
 		List	   *workers;
 		ListCell   *lc;
@@ -755,7 +765,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (is_parallel_apply_worker(w))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -874,7 +884,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (w->subid == subid && is_parallel_apply_worker(w))
 			res++;
 	}
 
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+				logicalrep_worker_launch(WORKERTYPE_APPLY,
+										 sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
 										 DSM_HANDLE_INVALID);
 			}
@@ -1243,7 +1254,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		if (is_parallel_apply_worker(w) && w->proc && pid == w->proc->pid)
 		{
 			leader_pid = w->leader_pid;
 			break;
@@ -1296,7 +1307,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 
-		if (isParallelApplyWorker(&worker))
+		if (is_parallel_apply_worker(&worker))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775..25343be 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
@@ -653,10 +654,10 @@ process_syncing_tables(XLogRecPtr current_lsn)
 	 * that are in a READY state. See pa_can_start() and
 	 * should_apply_changes_for_rel().
 	 */
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
@@ -1598,7 +1599,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
 {
 	char	   *sync_slotname = NULL;
 
-	Assert(am_tablesync_worker());
+	Assert(is_tablesync_worker(MyLogicalRepWorker));
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..82b09ff 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,9 +485,9 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	else if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		/* We don't synchronize rel's that are in unknown state. */
 		if (rel->state != SUBREL_STATE_READY &&
@@ -1053,7 +1053,7 @@ apply_handle_begin_prepare(StringInfo s)
 	LogicalRepPreparedTxnData begin_data;
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
@@ -1292,7 +1292,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
@@ -1422,7 +1422,7 @@ apply_handle_origin(StringInfo s)
 	 */
 	if (!in_streamed_transaction &&
 		(!in_remote_transaction ||
-		 (IsTransactionState() && !am_tablesync_worker())))
+		 (IsTransactionState() && !is_tablesync_worker(MyLogicalRepWorker))))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("ORIGIN message sent out of order")));
@@ -2019,7 +2019,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
-	if (!am_parallel_apply_worker())
+	if (!is_parallel_apply_worker(MyLogicalRepWorker))
 		maybe_start_skipping_changes(lsn);
 
 	/* Make sure we have an open transaction */
@@ -3452,7 +3452,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	 * Skip for parallel apply workers, because the lsn_mapping is maintained
 	 * by the leader apply worker.
 	 */
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
 	/* Need to do this in permanent context */
@@ -3844,7 +3844,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		/*
 		 * Don't stop the parallel apply worker as the leader will detect the
@@ -3863,7 +3863,7 @@ apply_worker_exit(void)
 	 * subscription is still active, and so that we won't leak that hash table
 	 * entry if it isn't.
 	 */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	proc_exit(0);
@@ -3906,7 +3906,7 @@ maybe_reread_subscription(void)
 						MySubscription->name)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (am_leader_apply_worker())
+		if (is_leader_apply_worker(MyLogicalRepWorker))
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 		proc_exit(0);
@@ -3945,7 +3945,7 @@ maybe_reread_subscription(void)
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
-		if (am_parallel_apply_worker())
+		if (is_parallel_apply_worker(MyLogicalRepWorker))
 			ereport(LOG,
 					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
 							MySubscription->name)));
@@ -4436,7 +4436,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid, !is_tablesync_worker(MyLogicalRepWorker));
 
 			PG_RE_THROW();
 		}
@@ -4590,7 +4590,7 @@ InitializeLogRepWorker(void)
 						MyLogicalRepWorker->subid)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (am_leader_apply_worker())
+		if (is_leader_apply_worker(MyLogicalRepWorker))
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 		proc_exit(0);
@@ -4617,7 +4617,7 @@ InitializeLogRepWorker(void)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (is_tablesync_worker(MyLogicalRepWorker))
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
@@ -4637,7 +4637,7 @@ SetupApplyOrSyncWorker(int worker_slot)
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
 
-	Assert(am_tablesync_worker() || am_leader_apply_worker());
+	Assert(is_tablesync_worker(MyLogicalRepWorker) || is_leader_apply_worker(MyLogicalRepWorker));
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -4709,7 +4709,7 @@ DisableSubscriptionAndExit(void)
 
 	/* Report the worker failed during either table synchronization or apply */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+									 !is_tablesync_worker(MyLogicalRepWorker));
 
 	/* Disable the subscription */
 	StartTransactionCommand();
@@ -4717,7 +4717,7 @@ DisableSubscriptionAndExit(void)
 	CommitTransactionCommand();
 
 	/* Ensure we remove no-longer-useful entry for worker's start time */
-	if (am_leader_apply_worker())
+	if (is_leader_apply_worker(MyLogicalRepWorker))
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	/* Notify the subscription has been disabled and exit */
@@ -4743,7 +4743,7 @@ IsLogicalWorker(void)
 bool
 IsLogicalParallelApplyWorker(void)
 {
-	return IsLogicalWorker() && am_parallel_apply_worker();
+	return IsLogicalWorker() && is_parallel_apply_worker(MyLogicalRepWorker);
 }
 
 /*
@@ -4808,7 +4808,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
 	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
 	bool		started_tx = false;
 
-	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || is_parallel_apply_worker(MyLogicalRepWorker))
 		return;
 
 	if (!IsTransactionState())
@@ -5042,7 +5042,7 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 {
 	*winfo = NULL;
 
-	if (am_parallel_apply_worker())
+	if (is_parallel_apply_worker(MyLogicalRepWorker))
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 672a711..0ae0949 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,9 +27,20 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	WORKERTYPE_UNKNOWN = 0,
+	WORKERTYPE_TABLESYNC,
+	WORKERTYPE_APPLY,
+	WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,25 +327,22 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
-
 static inline bool
-am_tablesync_worker(void)
+is_tablesync_worker(LogicalRepWorker *w)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return w->type == WORKERTYPE_TABLESYNC;
 }
 
 static inline bool
-am_leader_apply_worker(void)
+is_leader_apply_worker(LogicalRepWorker *w)
 {
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
+	return w->type == WORKERTYPE_APPLY;
 }
 
 static inline bool
-am_parallel_apply_worker(void)
+is_parallel_apply_worker(LogicalRepWorker *w)
 {
-	return isParallelApplyWorker(MyLogicalRepWorker);
+	return w->type == WORKERTYPE_PARALLEL_APPLY;
 }
 
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc..52a8789 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

