From 99c0aca7e7cdc683c05a3dd831d12a82e4f7db34 Mon Sep 17 00:00:00 2001
From: Kuntal Ghosh <kuntal.ghosh@enterprisedb.com>
Date: Wed, 15 Mar 2017 11:32:49 +0530
Subject: [PATCH 1/3] Infra to expose all backend processes in
 pg_stat_get_activity

This patch implements the infrastructure required to expose
all backend processes including auxiliary procs in pg_stat_activity.
BackendStatusArray is extended to store auxiliary processes. Backends
use slots indexed in the range from 1 to MaxBackends (inclusive),
so we use MaxBackends + AuxBackendType + 1 as the index of the slot for an
auxiliary process.
---
 src/backend/postmaster/pgstat.c     | 190 +++++++++++++++++++++++++++++++-----
 src/backend/storage/lmgr/proc.c     |  27 +++++
 src/backend/utils/adt/pgstatfuncs.c |  30 +++++-
 src/include/pgstat.h                |  26 +++++
 src/include/storage/proc.h          |   1 +
 5 files changed, 247 insertions(+), 27 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7cacb1e..5b7804a 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -50,6 +50,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/fork_process.h"
 #include "postmaster/postmaster.h"
+#include "replication/walsender.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -103,6 +104,18 @@
 
 
 /* ----------
+ * Total number of backends including auxiliary
+ *
+ * We reserve a slot for each possible BackendId, plus one for each
+ * possible auxiliary process type.  (This scheme assumes there is not
+ * more than one of any auxiliary process type at a time.) MaxBackends
+ * includes autovacuum workers and background workers as well.
+ * ----------
+ */
+#define NumBackendStatSlots (MaxBackends + NUM_AUXPROCTYPES)
+
+
+/* ----------
  * GUC parameters
  * ----------
  */
@@ -212,7 +225,11 @@ typedef struct TwoPhasePgStatRecord
  */
 static MemoryContext pgStatLocalContext = NULL;
 static HTAB *pgStatDBHash = NULL;
+
+/* Status for backends including auxiliary */
 static LocalPgBackendStatus *localBackendStatusTable = NULL;
+
+/* Total number of backends including auxiliary */
 static int	localNumBackends = 0;
 
 /*
@@ -2504,20 +2521,20 @@ BackendStatusShmemSize(void)
 	Size		size;
 
 	/* BackendStatusArray: */
-	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
+	size = mul_size(sizeof(PgBackendStatus), NumBackendStatSlots);
 	/* BackendAppnameBuffer: */
 	size = add_size(size,
-					mul_size(NAMEDATALEN, MaxBackends));
+					mul_size(NAMEDATALEN, NumBackendStatSlots));
 	/* BackendClientHostnameBuffer: */
 	size = add_size(size,
-					mul_size(NAMEDATALEN, MaxBackends));
+					mul_size(NAMEDATALEN, NumBackendStatSlots));
 	/* BackendActivityBuffer: */
 	size = add_size(size,
-					mul_size(pgstat_track_activity_query_size, MaxBackends));
+			mul_size(pgstat_track_activity_query_size, NumBackendStatSlots));
 #ifdef USE_SSL
 	/* BackendSslStatusBuffer: */
 	size = add_size(size,
-					mul_size(sizeof(PgBackendSSLStatus), MaxBackends));
+				  mul_size(sizeof(PgBackendSSLStatus), NumBackendStatSlots));
 #endif
 	return size;
 }
@@ -2535,7 +2552,7 @@ CreateSharedBackendStatus(void)
 	char	   *buffer;
 
 	/* Create or attach to the shared array */
-	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
+	size = mul_size(sizeof(PgBackendStatus), NumBackendStatSlots);
 	BackendStatusArray = (PgBackendStatus *)
 		ShmemInitStruct("Backend Status Array", size, &found);
 
@@ -2558,7 +2575,7 @@ CreateSharedBackendStatus(void)
 
 		/* Initialize st_appname pointers. */
 		buffer = BackendAppnameBuffer;
-		for (i = 0; i < MaxBackends; i++)
+		for (i = 0; i < NumBackendStatSlots; i++)
 		{
 			BackendStatusArray[i].st_appname = buffer;
 			buffer += NAMEDATALEN;
@@ -2576,7 +2593,7 @@ CreateSharedBackendStatus(void)
 
 		/* Initialize st_clienthostname pointers. */
 		buffer = BackendClientHostnameBuffer;
-		for (i = 0; i < MaxBackends; i++)
+		for (i = 0; i < NumBackendStatSlots; i++)
 		{
 			BackendStatusArray[i].st_clienthostname = buffer;
 			buffer += NAMEDATALEN;
@@ -2585,7 +2602,7 @@ CreateSharedBackendStatus(void)
 
 	/* Create or attach to the shared activity buffer */
 	BackendActivityBufferSize = mul_size(pgstat_track_activity_query_size,
-										 MaxBackends);
+										 NumBackendStatSlots);
 	BackendActivityBuffer = (char *)
 		ShmemInitStruct("Backend Activity Buffer",
 						BackendActivityBufferSize,
@@ -2597,7 +2614,7 @@ CreateSharedBackendStatus(void)
 
 		/* Initialize st_activity pointers. */
 		buffer = BackendActivityBuffer;
-		for (i = 0; i < MaxBackends; i++)
+		for (i = 0; i < NumBackendStatSlots; i++)
 		{
 			BackendStatusArray[i].st_activity = buffer;
 			buffer += pgstat_track_activity_query_size;
@@ -2606,7 +2623,7 @@ CreateSharedBackendStatus(void)
 
 #ifdef USE_SSL
 	/* Create or attach to the shared SSL status buffer */
-	size = mul_size(sizeof(PgBackendSSLStatus), MaxBackends);
+	size = mul_size(sizeof(PgBackendSSLStatus), NumBackendStatSlots);
 	BackendSslStatusBuffer = (PgBackendSSLStatus *)
 		ShmemInitStruct("Backend SSL Status Buffer", size, &found);
 
@@ -2618,7 +2635,7 @@ CreateSharedBackendStatus(void)
 
 		/* Initialize st_sslstatus pointers. */
 		ptr = BackendSslStatusBuffer;
-		for (i = 0; i < MaxBackends; i++)
+		for (i = 0; i < NumBackendStatSlots; i++)
 		{
 			BackendStatusArray[i].st_sslstatus = ptr;
 			ptr++;
@@ -2632,7 +2649,8 @@ CreateSharedBackendStatus(void)
  * pgstat_initialize() -
  *
  *	Initialize pgstats state, and set up our on-proc-exit hook.
- *	Called from InitPostgres.  MyBackendId must be set,
+ *	Called from InitPostgres and AuxiliaryProcessMain. For auxiliary process,
+ *	MyBackendId is invalid. Otherwise, MyBackendId must be set,
  *	but we must not have started any transaction yet (since the
  *	exit hook must run after the last transaction exit).
  *	NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
@@ -2642,8 +2660,26 @@ void
 pgstat_initialize(void)
 {
 	/* Initialize MyBEEntry */
-	Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
-	MyBEEntry = &BackendStatusArray[MyBackendId - 1];
+	if (MyBackendId != InvalidBackendId)
+	{
+		Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
+		MyBEEntry = &BackendStatusArray[MyBackendId - 1];
+	}
+	else
+	{
+		/* Must be an auxiliary process */
+		Assert(MyAuxProcType != NotAnAuxProcess);
+
+		/*
+		 * Assign the MyBEEntry for an auxiliary process.  Since it doesn't
+		 * have a BackendId, the slot is statically allocated based on the
+		 * auxiliary process type (MyAuxProcType).  Backends use slots indexed
+		 * in the range from 1 to MaxBackends (inclusive), so we use
+		 * MaxBackends + AuxBackendType + 1 as the index of the slot for an
+		 * auxiliary process.
+		 */
+		MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
+	}
 
 	/* Set up a process-exit hook to clean up */
 	on_shmem_exit(pgstat_beshutdown_hook, 0);
@@ -2654,15 +2690,16 @@ pgstat_initialize(void)
  *
  *	Initialize this backend's entry in the PgBackendStatus array.
  *	Called from InitPostgres.
- *	MyDatabaseId, session userid, and application_name must be set
- *	(hence, this cannot be combined with pgstat_initialize).
+ *
+ *	Apart from auxiliary processes, MyDatabaseId, session userid,
+ *	and application_name must be set for a backend (hence, this
+ *	cannot be combined with pgstat_initialize).
  * ----------
  */
 void
 pgstat_bestart(void)
 {
 	TimestampTz proc_start_timestamp;
-	Oid			userid;
 	SockAddr	clientaddr;
 	volatile PgBackendStatus *beentry;
 
@@ -2677,7 +2714,6 @@ pgstat_bestart(void)
 		proc_start_timestamp = MyProcPort->SessionStartTime;
 	else
 		proc_start_timestamp = GetCurrentTimestamp();
-	userid = GetSessionUserId();
 
 	/*
 	 * We may not have a MyProcPort (eg, if this is the autovacuum process).
@@ -2696,6 +2732,62 @@ pgstat_bestart(void)
 	 * cute.
 	 */
 	beentry = MyBEEntry;
+
+	if (MyBackendId != InvalidBackendId)
+	{
+		if (IsAutoVacuumLauncherProcess())
+		{
+			/* Autovacuum Launcher */
+			beentry->st_backendType = B_AUTOVAC_LAUNCHER;
+		}
+		else if (IsAutoVacuumWorkerProcess())
+		{
+			/* Autovacuum Worker */
+			beentry->st_backendType = B_AUTOVAC_WORKER;
+		}
+		else if (am_walsender)
+		{
+			/* Wal sender */
+			beentry->st_backendType = B_WAL_SENDER;
+		}
+		else if (IsBackgroundWorker)
+		{
+			/* bgworker */
+			beentry->st_backendType = B_BG_WORKER;
+		}
+		else
+		{
+			/* client-backend */
+			beentry->st_backendType = B_BACKEND;
+		}
+	}
+	else
+	{
+		/* Must be an auxiliary process */
+		Assert(MyAuxProcType != NotAnAuxProcess);
+		switch (MyAuxProcType)
+		{
+			case StartupProcess:
+				beentry->st_backendType = B_STARTUP;
+				break;
+			case BgWriterProcess:
+				beentry->st_backendType = B_BG_WRITER;
+				break;
+			case CheckpointerProcess:
+				beentry->st_backendType = B_CHECKPOINTER;
+				break;
+			case WalWriterProcess:
+				beentry->st_backendType = B_WAL_WRITER;
+				break;
+			case WalReceiverProcess:
+				beentry->st_backendType = B_WAL_RECEIVER;
+				break;
+			default:
+				elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
+				proc_exit(1);
+		}
+	}
+
 	do
 	{
 		pgstat_increment_changecount_before(beentry);
@@ -2707,7 +2799,15 @@ pgstat_bestart(void)
 	beentry->st_state_start_timestamp = 0;
 	beentry->st_xact_start_timestamp = 0;
 	beentry->st_databaseid = MyDatabaseId;
-	beentry->st_userid = userid;
+
+	/* We have userid for client-backends, wal-sender and bgworker processes */
+	if (beentry->st_backendType == B_BACKEND
+			|| beentry->st_backendType == B_WAL_SENDER
+			|| beentry->st_backendType == B_BG_WORKER)
+		beentry->st_userid = GetSessionUserId();
+	else
+		beentry->st_userid = InvalidOid;
+
 	beentry->st_clientaddr = clientaddr;
 	if (MyProcPort && MyProcPort->remote_hostname)
 		strlcpy(beentry->st_clienthostname, MyProcPort->remote_hostname,
@@ -3045,24 +3145,24 @@ pgstat_read_current_status(void)
 
 	localtable = (LocalPgBackendStatus *)
 		MemoryContextAlloc(pgStatLocalContext,
-						   sizeof(LocalPgBackendStatus) * MaxBackends);
+						 sizeof(LocalPgBackendStatus) * NumBackendStatSlots);
 	localappname = (char *)
 		MemoryContextAlloc(pgStatLocalContext,
-						   NAMEDATALEN * MaxBackends);
+						   NAMEDATALEN * NumBackendStatSlots);
 	localactivity = (char *)
 		MemoryContextAlloc(pgStatLocalContext,
-						   pgstat_track_activity_query_size * MaxBackends);
+					 pgstat_track_activity_query_size * NumBackendStatSlots);
 #ifdef USE_SSL
 	localsslstatus = (PgBackendSSLStatus *)
 		MemoryContextAlloc(pgStatLocalContext,
-						   sizeof(PgBackendSSLStatus) * MaxBackends);
+						   sizeof(PgBackendSSLStatus) * NumBackendStatSlots);
 #endif
 
 	localNumBackends = 0;
 
 	beentry = BackendStatusArray;
 	localentry = localtable;
-	for (i = 1; i <= MaxBackends; i++)
+	for (i = 1; i <= NumBackendStatSlots; i++)
 	{
 		/*
 		 * Follow the protocol of retrying if st_changecount changes while we
@@ -3590,7 +3690,47 @@ pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen)
 	return NULL;
 }
 
+const char *
+pgstat_get_backend_desc(BackendType backendType)
+{
+	const char *backendDesc = "unknown process type";
 
+	switch (backendType)
+	{
+		case B_AUTOVAC_LAUNCHER:
+			backendDesc = "autovacuum launcher";
+			break;
+		case B_AUTOVAC_WORKER:
+			backendDesc = "autovacuum worker";
+			break;
+		case B_BACKEND:
+			backendDesc = "client backend";
+			break;
+		case B_BG_WORKER:
+			backendDesc = "background worker";
+			break;
+		case B_BG_WRITER:
+			backendDesc = "background writer";
+			break;
+		case B_CHECKPOINTER:
+			backendDesc = "checkpointer";
+			break;
+		case B_STARTUP:
+			backendDesc = "startup";
+			break;
+		case B_WAL_RECEIVER:
+			backendDesc = "walreceiver";
+			break;
+		case B_WAL_SENDER:
+			backendDesc = "walsender";
+			break;
+		case B_WAL_WRITER:
+			backendDesc = "walwriter";
+			break;
+	}
+
+	return backendDesc;
+}
 /* ------------------------------------------------------------
  * Local support functions follow
  * ------------------------------------------------------------
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 8f467be..3e716b1 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -941,6 +941,33 @@ AuxiliaryProcKill(int code, Datum arg)
 	SpinLockRelease(ProcStructLock);
 }
 
+/*
+ * AuxiliaryPidGetProc -- get PGPROC for an auxiliary process
+ * given its PID
+ *
+ * Returns NULL if not found.
+ */
+PGPROC *
+AuxiliaryPidGetProc(int pid)
+{
+	PGPROC	   *result = NULL;
+	int			index;
+
+	if (pid == 0)				/* never match dummy PGPROCs */
+		return NULL;
+
+	for (index = 0; index < NUM_AUXILIARY_PROCS; index++)
+	{
+		PGPROC	   *proc = &AuxiliaryProcs[index];
+
+		if (proc->pid == pid)
+		{
+			result = proc;
+			break;
+		}
+	}
+	return result;
+}
 
 /*
  * ProcQueue package: routines for putting processes to sleep
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a987d0d..96a1188 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -20,6 +20,7 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/postmaster.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -615,9 +616,18 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			continue;
 
 		/* Values available to all callers */
-		values[0] = ObjectIdGetDatum(beentry->st_databaseid);
+		if (beentry->st_databaseid != InvalidOid)
+			values[0] = ObjectIdGetDatum(beentry->st_databaseid);
+		else
+			nulls[0] = true;
+
 		values[1] = Int32GetDatum(beentry->st_procpid);
-		values[2] = ObjectIdGetDatum(beentry->st_userid);
+
+		if (beentry->st_userid != InvalidOid)
+			values[2] = ObjectIdGetDatum(beentry->st_userid);
+		else
+			nulls[2] = true;
+
 		if (beentry->st_appname)
 			values[3] = CStringGetTextDatum(beentry->st_appname);
 		else
@@ -690,6 +700,22 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				wait_event = pgstat_get_wait_event(raw_wait_event);
 
 			}
+			else if (beentry->st_backendType != B_BACKEND)
+			{
+				uint32		raw_wait_event;
+
+				/*
+				 * For an auxiliary process, retrieve process info from
+				 * AuxiliaryProcs stored in shared-memory.
+				 */
+				proc = AuxiliaryPidGetProc(beentry->st_procpid);
+
+				/* Check whether this is indeed an auxiliary process */
+				Assert(proc != NULL);
+				raw_wait_event = UINT32_ACCESS_ONCE(proc->wait_event_info);
+				wait_event_type = pgstat_get_wait_event_type(raw_wait_event);
+				wait_event = pgstat_get_wait_event(raw_wait_event);
+			}
 			else
 			{
 				wait_event_type = NULL;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 60c78d1..217df45 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -696,6 +696,25 @@ typedef struct PgStat_GlobalStats
 
 
 /* ----------
+ * Backend types
+ * ----------
+ */
+typedef enum BackendType
+{
+	B_AUTOVAC_LAUNCHER,
+	B_AUTOVAC_WORKER,
+	B_BACKEND,
+	B_BG_WORKER,
+	B_BG_WRITER,
+	B_CHECKPOINTER,
+	B_STARTUP,
+	B_WAL_RECEIVER,
+	B_WAL_SENDER,
+	B_WAL_WRITER
+} BackendType;
+
+
+/* ----------
  * Backend states
  * ----------
  */
@@ -847,6 +866,9 @@ typedef struct PgBackendSSLStatus
  * showing its current activity.  (The structs are allocated according to
  * BackendId, but that is not critical.)  Note that the collector process
  * has no involvement in, or even access to, these structs.
+ *
+ * Each auxiliary process also maintains a PgBackendStatus struct in shared
+ * memory.
  * ----------
  */
 typedef struct PgBackendStatus
@@ -871,6 +893,9 @@ typedef struct PgBackendStatus
 	/* The entry is valid iff st_procpid > 0, unused if st_procpid == 0 */
 	int			st_procpid;
 
+	/* Type of backends */
+	BackendType st_backendType;
+
 	/* Times when current backend, transaction, and activity started */
 	TimestampTz st_proc_start_timestamp;
 	TimestampTz st_xact_start_timestamp;
@@ -1069,6 +1094,7 @@ extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
 extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer,
 									int buflen);
+extern const char *pgstat_get_backend_desc(BackendType backendType);
 
 extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
 							  Oid relid);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 5f38fa6..be3bd01 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -262,6 +262,7 @@ extern PGPROC *PreparedXactProcs;
  */
 #define NUM_AUXILIARY_PROCS		4
 
+extern PGPROC *AuxiliaryPidGetProc(int pid);
 
 /* configurable options */
 extern int	DeadlockTimeout;
-- 
1.8.3.1

