On Mon, Apr 10, 2023 at 07:20:42PM +0000, Imseih (AWS), Sami wrote:
> See v28 addressing the comments.

This should be OK (also checked the code paths where the reports are
added).  Note that the patch needed a few adjustments for its
indentation.
--
Michael
From 9267342ba2a1b8b120efaa98871b736a5bbde9a9 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 12 Apr 2023 13:45:59 +0900
Subject: [PATCH v29] Report index vacuum progress.

Add 2 new columns to pg_stat_progress_vacuum.
The columns are ndexes_total as the total indexes to be vacuumed or cleaned and
indexes_processed as the number of indexes vacuumed or cleaned up so far.

Authors: Sami Imseih
Reviewed by: Masahiko Sawada, Michael Paquier, Nathan Bossart, Andres Freund
Discussion: https://www.postgresql.org/message-id/flat/5478dfcd-2333-401a-b2f0-0d186ab09...@amazon.com
---
 src/include/commands/progress.h               |  2 +
 src/include/utils/backend_progress.h          |  1 +
 src/backend/access/heap/vacuumlazy.c          | 70 ++++++++++++++++---
 src/backend/access/transam/parallel.c         | 18 +++++
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/vacuumparallel.c         |  9 ++-
 src/backend/utils/activity/backend_progress.c | 32 +++++++++
 src/test/regress/expected/rules.out           |  4 +-
 doc/src/sgml/monitoring.sgml                  | 23 ++++++
 9 files changed, 150 insertions(+), 12 deletions(-)

diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index e5add41352..2478e87425 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -25,6 +25,8 @@
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_INDEXES_TOTAL			7
+#define PROGRESS_VACUUM_INDEXES_PROCESSED		8
 
 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */
 #define PROGRESS_VACUUM_PHASE_SCAN_HEAP			1
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index a84752ade9..70dea55fc0 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -37,6 +37,7 @@ extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
 										  Oid relid);
 extern void pgstat_progress_update_param(int index, int64 val);
 extern void pgstat_progress_incr_param(int index, int64 incr);
+extern void pgstat_progress_parallel_incr_param(int index, int64 incr);
 extern void pgstat_progress_update_multi_param(int nparam, const int *index,
 											   const int64 *val);
 extern void pgstat_progress_end_command(void);
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 0a9ebd22bd..e5707098bd 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2314,6 +2314,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 {
 	bool		allindexes = true;
 	double		old_live_tuples = vacrel->rel->rd_rel->reltuples;
+	const int	progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEXES_TOTAL
+	};
+	const int	progress_end_index[] = {
+		PROGRESS_VACUUM_INDEXES_TOTAL,
+		PROGRESS_VACUUM_INDEXES_PROCESSED,
+		PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+	};
+	int64		progress_start_val[2];
+	int64		progress_end_val[3];
 
 	Assert(vacrel->nindexes > 0);
 	Assert(vacrel->do_index_vacuuming);
@@ -2326,9 +2337,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 		return false;
 	}
 
-	/* Report that we are now vacuuming indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+	/*
+	 * Report that we are now vacuuming indexes and the number of indexes to
+	 * vacuum.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2341,6 +2356,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 														  old_live_tuples,
 														  vacrel);
 
+			/* Report the number of indexes vacuumed */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_PROCESSED,
+										 idx + 1);
+
 			if (lazy_check_wraparound_failsafe(vacrel))
 			{
 				/* Wraparound emergency -- end current index scan */
@@ -2375,14 +2394,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	Assert(allindexes || VacuumFailsafeActive);
 
 	/*
-	 * Increase and report the number of index scans.
+	 * Increase and report the number of index scans.  Also, we reset
+	 * PROGRESS_VACUUM_INDEXES_TOTAL and PROGRESS_VACUUM_INDEXES_PROCESSED.
 	 *
 	 * We deliberately include the case where we started a round of bulk
 	 * deletes that we weren't able to finish due to the failsafe triggering.
 	 */
 	vacrel->num_index_scans++;
-	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
-								 vacrel->num_index_scans);
+	progress_end_val[0] = 0;
+	progress_end_val[1] = 0;
+	progress_end_val[2] = vacrel->num_index_scans;
+	pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val);
 
 	return allindexes;
 }
@@ -2619,6 +2641,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 
 	if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs)))
 	{
+		const int	progress_index[] = {
+			PROGRESS_VACUUM_INDEXES_TOTAL,
+			PROGRESS_VACUUM_INDEXES_PROCESSED
+		};
+		int64		progress_val[2] = {0, 0};
+
 		VacuumFailsafeActive = true;
 
 		/*
@@ -2633,6 +2661,9 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 		vacrel->do_index_cleanup = false;
 		vacrel->do_rel_truncate = false;
 
+		/* Reset the progress counters */
+		pgstat_progress_update_multi_param(2, progress_index, progress_val);
+
 		ereport(WARNING,
 				(errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans",
 						vacrel->dbname, vacrel->relnamespace, vacrel->relname,
@@ -2659,13 +2690,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
 	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	const int	progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEXES_TOTAL
+	};
+	const int	progress_end_index[] = {
+		PROGRESS_VACUUM_INDEXES_TOTAL,
+		PROGRESS_VACUUM_INDEXES_PROCESSED
+	};
+	int64		progress_start_val[2];
+	int64		progress_end_val[2] = {0, 0};
 
 	Assert(vacrel->do_index_cleanup);
 	Assert(vacrel->nindexes > 0);
 
-	/* Report that we are now cleaning up indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+	/*
+	 * Report that we are now cleaning up indexes and the number of indexes to
+	 * cleanup.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2677,6 +2722,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 			vacrel->indstats[idx] =
 				lazy_cleanup_one_index(indrel, istat, reltuples,
 									   estimated_count, vacrel);
+
+			/* Report the number of indexes cleaned up */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_PROCESSED,
+										 idx + 1);
 		}
 	}
 	else
@@ -2686,6 +2735,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 											vacrel->num_index_scans,
 											estimated_count);
 	}
+
+	/* Reset the progress counters */
+	pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val);
 }
 
 /*
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b26f2a64fb..88dcb43daa 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
@@ -1199,6 +1200,23 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 				break;
 			}
 
+		case 'P':				/* Parallel progress reporting */
+			{
+				/*
+				 * Only incremental progress reporting is currently supported.
+				 * However, it's possible to add more fields to the message to
+				 * allow for handling of other backend progress APIs.
+				 */
+				int			index = pq_getmsgint(msg, 4);
+				int64		incr = pq_getmsgint64(msg);
+
+				pq_getmsgend(msg);
+
+				pgstat_progress_incr_param(index, incr);
+
+				break;
+			}
+
 		case 'X':				/* Terminate, indicating clean exit */
 			{
 				shm_mq_detach(pcxt->worker[i].error_mqh);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 701c340fc4..1e44f94d3e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1189,7 +1189,8 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
+        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param8 AS indexes_total, S.param9 AS indexes_processed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 87ea5c5242..e8f1291955 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -30,6 +30,7 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/index.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
@@ -631,7 +632,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 													vacuum));
 	}
 
-	/* Reset the parallel index processing counter */
+	/* Reset the parallel index processing and progress counters */
 	pg_atomic_write_u32(&(pvs->shared->idx), 0);
 
 	/* Setup the shared cost-based vacuum delay and launch workers */
@@ -902,6 +903,12 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
 	pfree(pvs->indname);
 	pvs->indname = NULL;
+
+	/*
+	 * Call the parallel variant of pgstat_progress_incr_param so workers can
+	 * report progress of index vacuum to the leader.
+	 */
+	pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
 }
 
 /*
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index fb48eafef9..67447ef03a 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -10,6 +10,8 @@
  */
 #include "postgres.h"
 
+#include "access/parallel.h"
+#include "libpq/pqformat.h"
 #include "port/atomics.h"		/* for memory barriers */
 #include "utils/backend_progress.h"
 #include "utils/backend_status.h"
@@ -79,6 +81,36 @@ pgstat_progress_incr_param(int index, int64 incr)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*-----------
+ * pgstat_progress_parallel_incr_param() -
+ *
+ * A variant of pgstat_progress_incr_param to allow a worker to poke at
+ * a leader to do an incremental progress update.
+ *-----------
+ */
+void
+pgstat_progress_parallel_incr_param(int index, int64 incr)
+{
+	/*
+	 * Parallel workers notify a leader through a 'P' protocol message to
+	 * update progress, passing the progress index and incremented value.
+	 * Leaders can just call pgstat_progress_incr_param directly.
+	 */
+	if (IsParallelWorker())
+	{
+		static StringInfoData progress_message;
+
+		initStringInfo(&progress_message);
+
+		pq_beginmessage(&progress_message, 'P');
+		pq_sendint32(&progress_message, index);
+		pq_sendint64(&progress_message, incr);
+		pq_endmessage(&progress_message);
+	}
+	else
+		pgstat_progress_incr_param(index, incr);
+}
+
 /*-----------
  * pgstat_progress_update_multi_param() -
  *
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2d75dd6656..5e5f9b5a9a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2041,7 +2041,9 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
     s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples
+    s.param7 AS num_dead_tuples,
+    s.param8 AS indexes_total,
+    s.param9 AS indexes_processed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_recovery_prefetch| SELECT stats_reset,
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3f33a1c56c..b6bbb85be9 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -7316,6 +7316,29 @@ FROM pg_stat_get_backend_idset() AS backendid;
        Number of dead tuples collected since the last index vacuum cycle.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Total number of indexes that will be vacuumed or cleaned up. This
+       number is reported at the beginning of the
+       <literal>vacuuming indexes</literal> phase or the
+       <literal>cleaning up indexes</literal> phase.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of indexes processed. This counter only advances when the
+       phase is <literal>vacuuming indexes</literal> or
+       <literal>cleaning up indexes</literal>.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
-- 
2.40.0

Attachment: signature.asc
Description: PGP signature

Reply via email to