From 5314444d550893cf41a780c7a5cf63274e579dc7 Mon Sep 17 00:00:00 2001
From: "Sami Imseih (AWS)" <simseih@amazon.com>
Date: Mon, 14 Mar 2022 15:55:57 +0000
Subject: [PATCH v7 1/1] Add infrastructure for parallel progress reporting

Infrastructure to allow a parallel worker to report
progress. In a PARALLEL command, the workers and
leader can report progress using a new pgstat_progress
API.

The progress is maintaned in a shared memory hash
table for which progress values are aggregated for
all processes involved in the command.

pg_stat_get_progress_info reads from the shared memory
hash to report the aggregated data to the caller.

Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada
Reviewed by: Nathan Bossart, Justin Pryzby
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228%40amazon.com
---
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/lmgr/lwlocknames.txt      |   1 +
 src/backend/utils/activity/backend_progress.c | 166 ++++++++++++++++++
 src/backend/utils/adt/pgstatfuncs.c           |  10 +-
 src/include/commands/progress.h               |   4 +
 src/include/utils/backend_progress.h          |  11 +-
 6 files changed, 191 insertions(+), 4 deletions(-)

diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index cd4ebe2fc5..ccb9262b97 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -24,6 +24,7 @@
 #include "access/twophase.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/progress.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -145,6 +146,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, BTreeShmemSize());
 	size = add_size(size, SyncScanShmemSize());
 	size = add_size(size, AsyncShmemSize());
+	size = add_size(size, ProgressParallelShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -296,6 +298,7 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	ProgressParallelShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 6c7cf6c295..4212dea7f3 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,4 @@ XactTruncationLock					44
 # 45 was XactTruncationLock until removal of BackendRandomLock
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
+ProgressParallelLock				48
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index f29199725b..de69aaa8ad 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -10,10 +10,30 @@
  */
 #include "postgres.h"
 
+#include "commands/progress.h"
 #include "port/atomics.h"		/* for memory barriers */
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/shmem.h"
 #include "utils/backend_progress.h"
 #include "utils/backend_status.h"
 
+/*
+ * Structs for parallel progress tracking.
+ *
+ * The parallel workers and leader report progress
+ * into a hash entry with a key of the leader pid.
+ */
+typedef struct ProgressParallelEntry
+{
+	pid_t   leader_pid;
+	int64 	st_progress_param[PGSTAT_NUM_PROGRESS_PARAM];
+} ProgressParallelEntry;
+
+static HTAB *ProgressParallelHash;
+
+/* We can only have as many parallel progress entries as max_parallel_workers */
+#define PROGRESS_PARALLEL_NUM_ENTRIES max_worker_processes
 
 /*-----------
  * pgstat_progress_start_command() -
@@ -110,3 +130,149 @@ pgstat_progress_end_command(void)
 	beentry->st_progress_command_target = InvalidOid;
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
+
+/*-----------
+ * ProgressParallelShmemInit() -
+ *
+ * Initialize the parallel progress hash.
+ *-----------
+ */
+void
+ProgressParallelShmemInit(void)
+{
+	HASHCTL     info;
+
+	info.keysize = sizeof(pid_t);
+	info.entrysize = sizeof(ProgressParallelEntry);
+
+	ProgressParallelHash = ShmemInitHash("Parallel Progress hash",
+									   PROGRESS_PARALLEL_NUM_ENTRIES,
+									   PROGRESS_PARALLEL_NUM_ENTRIES,
+									   &info,
+									   HASH_ELEM | HASH_BLOBS);
+}
+
+/*-----------
+ * ProgressParallelShmemSize() -
+ *
+ * Calculate the size of the parallel progress hash.
+ *-----------
+ */
+Size
+ProgressParallelShmemSize(void)
+{
+   Size        size = 0;
+
+   /* parallel progress hash table */
+   size = add_size(size, hash_estimate_size(PROGRESS_PARALLEL_NUM_ENTRIES,
+											sizeof(ProgressParallelEntry)));
+
+   return size;
+}
+
+/*-----------
+ * pgstat_progress_update_param_parallel() -
+ *
+ * Update the index'th member in then st_progress_param[] of the
+ * parallel progress hash table.
+ *-----------
+ */
+void
+pgstat_progress_update_param_parallel(int leader_pid, int index, int64 val)
+{
+	ProgressParallelEntry *entry;
+	bool found;
+
+	LWLockAcquire(ProgressParallelLock, LW_EXCLUSIVE);
+
+	entry = (ProgressParallelEntry *) hash_search(ProgressParallelHash, &leader_pid, HASH_ENTER, &found);
+
+	/*
+	 * If the entry is not found, set the value for the index'th member,
+	 * else increment the current value of the index'th member.
+	 */
+	if (!found)
+		entry->st_progress_param[index] = val;
+	else
+		entry->st_progress_param[index] += val;
+
+	LWLockRelease(ProgressParallelLock);
+}
+
+/*-----------
+ * pgstat_progress_end_parallel() -
+ *
+ * This removes an entry with from the parallel progress
+ * hash table.
+ *-----------
+ */
+void
+pgstat_progress_end_parallel(int leader_pid)
+{
+	LWLockAcquire(ProgressParallelLock, LW_EXCLUSIVE);
+
+	/*
+	* Remove from hashtable. It should always be present,
+	* but don't complain if it's not.
+	*/
+	hash_search(ProgressParallelHash, &leader_pid, HASH_REMOVE, NULL);
+
+	LWLockRelease(ProgressParallelLock);
+}
+
+/*-----------
+ * pgstat_progress_end_parallel_callback() -
+ *
+ * PG_ENSURE_ERROR_CLEANUP callback. The caller is responsible
+ * for ensuring cleanup when invoking pgstat_progress_update_param_parallel.
+ *-----------
+ */
+void
+pgstat_progress_end_parallel_callback(int code, Datum arg)
+{
+	pgstat_progress_end_parallel(DatumGetInt32(arg));
+}
+
+/*-----------
+ * pgstat_progress_set_parallel() -
+ *
+ * This routine is called by pg_stat_get_progress_info
+ * to update the datum with values from the parallel progress
+ * hash.
+ *-----------
+ */
+void
+pgstat_progress_set_parallel(Datum *values)
+{
+	ProgressParallelEntry *entry;
+	/* First element of the datum is always the pid */
+	int leader_pid = values[0];
+
+	LWLockAcquire(ProgressParallelLock, LW_SHARED);
+
+	entry = (ProgressParallelEntry *) hash_search(ProgressParallelHash, &leader_pid, HASH_FIND, NULL);
+
+	/*
+	 * If an entry is not found, it is because we looked at a pid that is not involved in a parallel command,
+	 * therefore release the read lock and break.
+	 */
+	if (!entry)
+	{
+		LWLockRelease(ProgressParallelLock);
+		return;
+	}
+
+	for (int i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
+	{
+		int64 val = entry->st_progress_param[i];
+
+		/*
+		 * We only care about hash entry members that have been updated by
+		 * parallel workers ( or leader ). This is true if the member's value > 0.
+		 */
+		if (val > 0)
+			values[i + PGSTAT_NUM_PROGRESS_COMMON] = val;
+	}
+
+	LWLockRelease(ProgressParallelLock);
+}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index eff45b16f2..1856e9c3b6 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -18,6 +18,7 @@
 #include "access/xlog.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
+#include "commands/progress.h"
 #include "common/ip.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -456,7 +457,7 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_PROGRESS_COLS	PGSTAT_NUM_PROGRESS_PARAM + 3
+#define PG_STAT_GET_PROGRESS_COLS	PGSTAT_NUM_PROGRESS_PARAM + PGSTAT_NUM_PROGRESS_COMMON
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	char	   *cmd = text_to_cstring(PG_GETARG_TEXT_PP(0));
@@ -518,15 +519,18 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 		{
 			values[2] = ObjectIdGetDatum(beentry->st_progress_command_target);
 			for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
-				values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]);
+				values[i + PGSTAT_NUM_PROGRESS_COMMON] = Int64GetDatum(beentry->st_progress_param[i]);
 		}
 		else
 		{
 			nulls[2] = true;
 			for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
-				nulls[i + 3] = true;
+				nulls[i + PGSTAT_NUM_PROGRESS_COMMON] = true;
 		}
 
+		/* Before returning the datum, set the fields from parallel progress tracking */
+		pgstat_progress_set_parallel(values);
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 	}
 
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index a28938caf4..cd3122c344 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -25,6 +25,8 @@
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_TOTAL_INDEXES			7
+#define PROGRESS_VACUUM_INDEXES_COMPLETED		8
 
 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */
 #define PROGRESS_VACUUM_PHASE_SCAN_HEAP			1
@@ -151,4 +153,6 @@
 #define PROGRESS_COPY_TYPE_PIPE 3
 #define PROGRESS_COPY_TYPE_CALLBACK 4
 
+/* Number of common fields at the start of progress views */
+#define PGSTAT_NUM_PROGRESS_COMMON 3
 #endif
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index 47bf8029b0..291d9413c7 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -39,6 +39,15 @@ extern void pgstat_progress_update_param(int index, int64 val);
 extern void pgstat_progress_update_multi_param(int nparam, const int *index,
 											   const int64 *val);
 extern void pgstat_progress_end_command(void);
-
+/* -----------
+ * Routines for parallel command progress reporting
+ * -----------
+ */
+extern void ProgressParallelShmemInit(void);
+extern Size ProgressParallelShmemSize(void);
+extern void pgstat_progress_update_param_parallel(int leader_pid, int index, int64 val);
+extern void pgstat_progress_end_parallel(int leader_pid);
+extern void pgstat_progress_end_parallel_callback(int code, Datum arg);
+extern void pgstat_progress_set_parallel(Datum *values);
 
 #endif							/* BACKEND_PROGRESS_H */
-- 
2.32.0

