From 5b49df7ce65893c19008376c4cbab7e9fc5eca33 Mon Sep 17 00:00:00 2001
From: "Sami Imseih (AWS)" <simseih@amazon.com>
Date: Mon, 14 Mar 2022 14:07:31 +0000
Subject: [PATCH v7 2/4] Show progress for index vacuums

Add 2 new columns to pg_stat_progress_vacuum. The columns are
indexes_total as the total indexes to be vacuumed or cleaned and
indexes_processed as the number of indexes vacuumed or cleaned up so
far.

Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada
Reviewed by: Nathan Bossart, Justin Pryzby
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com
---
 doc/src/sgml/monitoring.sgml          | 25 ++++++++++++
 src/backend/access/heap/vacuumlazy.c  | 59 +++++++++++++++++++++++----
 src/backend/catalog/system_views.sql  |  3 +-
 src/backend/commands/vacuumparallel.c |  7 ++++
 src/test/regress/expected/rules.out   |  4 +-
 5 files changed, 89 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 9fb62fec8e..1acc741da9 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6227,6 +6227,31 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
        Number of dead tuples collected since the last index vacuum cycle.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       The number of indexes to be processed in the 
+       <literal>vacuuming indexes</literal> 
+       or <literal>cleaning up indexes</literal> phase. It is set to
+       <literal>0</literal> when there are no indexes to process 
+       or when failsafe is triggered during the vacuum.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       The number of indexes already processed in the
+       <literal>vacuuming indexes</literal>
+       or <literal>cleaning up indexes</literal> phase. It is set to
+       <literal>0</literal> when vacuum is not in any of these phases.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 87ab7775ae..65e4440dc3 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -58,6 +58,7 @@
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/lsyscache.h"
@@ -2313,9 +2314,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	/* Report that we are now vacuuming indexes */
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+	/* Advertise the number of indexes we are vacuuming */
+	pgstat_progress_update_param(PROGRESS_VACUUM_TOTAL_INDEXES, vacrel->nindexes);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
+		int indexes_processed = 1;
+
 		for (int idx = 0; idx < vacrel->nindexes; idx++)
 		{
 			Relation	indrel = vacrel->indrels[idx];
@@ -2325,6 +2330,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 				lazy_vacuum_one_index(indrel, istat, vacrel->old_live_tuples,
 									  vacrel);
 
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, indexes_processed++);
+
 			if (lazy_check_wraparound_failsafe(vacrel))
 			{
 				/* Wraparound emergency -- end current index scan */
@@ -2335,9 +2342,21 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	}
 	else
 	{
-		/* Outsource everything to parallel variant */
-		parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples,
-											vacrel->num_index_scans);
+		/*
+		 * Outsource everything to parallel variant
+		 *
+		 * To ensure cleanup of the progress worker hash entry,
+		 * wrap parallel_vacuum_bulkdel_all_indexes in a
+		 * PG_ENSURE_ERROR_CLEANUP
+		 *
+		 */
+		PG_ENSURE_ERROR_CLEANUP(pgstat_progress_end_parallel_callback, Int32GetDatum(MyProcPid));
+		{
+			parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples,
+												vacrel->num_index_scans);
+		}
+		PG_END_ENSURE_ERROR_CLEANUP(pgstat_progress_end_parallel_callback, Int32GetDatum(MyProcPid));
+		pgstat_progress_end_parallel(MyProcPid);
 
 		/*
 		 * Do a postcheck to consider applying wraparound failsafe now.  Note
@@ -2345,6 +2364,9 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 		 */
 		if (lazy_check_wraparound_failsafe(vacrel))
 			allindexes = false;
+
+		/* reset index progress */
+		pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, 0);
 	}
 
 	/*
@@ -2628,6 +2650,8 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 		vacrel->do_index_vacuuming = false;
 		vacrel->do_index_cleanup = false;
 		vacrel->do_rel_truncate = false;
+		pgstat_progress_update_param(PROGRESS_VACUUM_TOTAL_INDEXES, 0);
+		pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, 0);
 
 		ereport(WARNING,
 				(errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans",
@@ -2664,9 +2688,13 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 	/* Report that we are now cleaning up indexes */
 	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+	/* Advertise the number of indexes we are cleaning up */
+	pgstat_progress_update_param(PROGRESS_VACUUM_TOTAL_INDEXES, vacrel->nindexes);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
+		int indexes_processed = 1;
+
 		for (int idx = 0; idx < vacrel->nindexes; idx++)
 		{
 			Relation	indrel = vacrel->indrels[idx];
@@ -2675,15 +2703,32 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 			vacrel->indstats[idx] =
 				lazy_cleanup_one_index(indrel, istat, reltuples,
 									   estimated_count, vacrel);
+
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, indexes_processed++);
 		}
 	}
 	else
 	{
-		/* Outsource everything to parallel variant */
-		parallel_vacuum_cleanup_all_indexes(vacrel->pvs, reltuples,
-											vacrel->num_index_scans,
-											estimated_count);
+		/*
+		 * Outsource everything to parallel variant
+		 *
+		 * To ensure cleanup of the paralle progress hash entry,
+		 * wrap parallel_vacuum_bulkdel_all_indexes in a
+		 * PG_ENSURE_ERROR_CLEANUP
+		 *
+		 */
+		PG_ENSURE_ERROR_CLEANUP(pgstat_progress_end_parallel_callback, Int32GetDatum(MyProcPid));
+		{
+			parallel_vacuum_cleanup_all_indexes(vacrel->pvs, reltuples,
+												vacrel->num_index_scans,
+												estimated_count);
+		}
+		PG_END_ENSURE_ERROR_CLEANUP(pgstat_progress_end_parallel_callback, Int32GetDatum(MyProcPid));
+		pgstat_progress_end_parallel(MyProcPid);
 	}
+
+	/* reset index progress */
+	pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, 0);
 }
 
 /*
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bb1ac30cd1..35ea25026f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1126,7 +1126,8 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
+        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param8 AS indexes_total, S.param9 AS indexes_processed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 974a29e7a9..b491728425 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -29,6 +29,7 @@
 #include "access/amapi.h"
 #include "access/table.h"
 #include "catalog/index.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
@@ -101,6 +102,9 @@ typedef struct PVShared
 
 	/* Counter for vacuuming and cleanup */
 	pg_atomic_uint32 idx;
+
+	/* Leader PID of the vacuum */
+	int      leader_pid;
 } PVShared;
 
 /* Status used during parallel index vacuum or cleanup */
@@ -357,6 +361,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 		(nindexes_mwm > 0) ?
 		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
 		maintenance_work_mem;
+	shared->leader_pid = MyProcPid;
 
 	pg_atomic_init_u32(&(shared->cost_balance), 0);
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
@@ -844,9 +849,11 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	{
 		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
 			istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
+			pgstat_progress_update_param_parallel(pvs->shared->leader_pid, PROGRESS_VACUUM_INDEXES_COMPLETED, 1);
 			break;
 		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
 			istat_res = vac_cleanup_one_index(&ivinfo, istat);
+			pgstat_progress_update_param_parallel(pvs->shared->leader_pid, PROGRESS_VACUUM_INDEXES_COMPLETED, 1);
 			break;
 		default:
 			elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ac468568a1..d70a176514 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2002,7 +2002,9 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
     s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples
+    s.param7 AS num_dead_tuples,
+    s.param8 AS indexes_total,
+    s.param9 AS indexes_processed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_replication| SELECT s.pid,
-- 
2.32.0

