From d77f78756449ed3069bc8194baa6046d5cbf4071 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Wed, 15 Dec 2021 16:49:01 +0900
Subject: [PATCH v8] Move parallel vacuum code to vacuumparallel.c
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, parallel vacuum was specific to lazy vacuum, i.g., heap
table AM. But the job that parallel vacuum does isn’t really specific
to heap.

This commit moves parallel vacuum realted code to new file
commands/vacuumparallel.c so that any table AM supporting indexes can
utilize parallel vacuum in order to call index AM
callbacks (ambulkdelete and amvacuumcleanup) with parallel workers.

With that, also moves some vacuum related functions and structures to
commands/vacuum.c so that both lazy vacuum and parallel vacuum can
refer to them.

Suggestion from Andres Freund.

Discussion: https://www.postgresql.org/message-id/20211030212101.ae3qcouatwmy7tbr%40alap3.anarazel.de
---
 src/backend/access/heap/vacuumlazy.c  | 1170 ++-----------------------
 src/backend/access/transam/parallel.c |    2 +-
 src/backend/commands/Makefile         |    1 +
 src/backend/commands/vacuum.c         |  148 ++++
 src/backend/commands/vacuumparallel.c | 1097 +++++++++++++++++++++++
 src/include/access/heapam.h           |    1 -
 src/include/commands/vacuum.h         |   41 +
 src/tools/pgindent/typedefs.list      |    2 +
 8 files changed, 1353 insertions(+), 1109 deletions(-)
 create mode 100644 src/backend/commands/vacuumparallel.c

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index db6becfed5..dd1f2ed4d3 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -40,7 +40,6 @@
 #include "access/heapam_xlog.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
-#include "access/parallel.h"
 #include "access/transam.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -120,23 +119,11 @@
  */
 #define PREFETCH_SIZE			((BlockNumber) 32)
 
-/*
- * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
- * we don't need to worry about DSM keys conflicting with plan_node_id we can
- * use small integers.
- */
-#define PARALLEL_VACUUM_KEY_SHARED			1
-#define PARALLEL_VACUUM_KEY_DEAD_ITEMS		2
-#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
-#define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
-#define PARALLEL_VACUUM_KEY_WAL_USAGE		5
-#define PARALLEL_VACUUM_KEY_INDEX_STATS		6
-
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
  * parallel mode and the DSM segment is initialized.
  */
-#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL)
+#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
 
 /* Phases of vacuum during which we report error context. */
 typedef enum
@@ -149,155 +136,6 @@ typedef enum
 	VACUUM_ERRCB_PHASE_TRUNCATE
 } VacErrPhase;
 
-/*
- * LVDeadItems stores TIDs whose index tuples are deleted by index vacuuming.
- * Each TID points to an LP_DEAD line pointer from a heap page that has been
- * processed by lazy_scan_prune.
- *
- * Also needed by lazy_vacuum_heap_rel, which marks the same LP_DEAD line
- * pointers as LP_UNUSED during second heap pass.
- */
-typedef struct LVDeadItems
-{
-	int			max_items;		/* # slots allocated in array */
-	int			num_items;		/* current # of entries */
-
-	/* Sorted array of TIDs to delete from indexes */
-	ItemPointerData items[FLEXIBLE_ARRAY_MEMBER];
-} LVDeadItems;
-
-#define MAXDEADITEMS(avail_mem) \
-	(((avail_mem) - offsetof(LVDeadItems, items)) / sizeof(ItemPointerData))
-
-/*
- * Shared information among parallel workers.  So this is allocated in the DSM
- * segment.
- */
-typedef struct LVShared
-{
-	/*
-	 * Target table relid and log level.  These fields are not modified during
-	 * the lazy vacuum.
-	 */
-	Oid			relid;
-	int			elevel;
-
-	/*
-	 * Fields for both index vacuum and cleanup.
-	 *
-	 * reltuples is the total number of input heap tuples.  We set either old
-	 * live tuples in the index vacuum case or the new live tuples in the
-	 * index cleanup case.
-	 *
-	 * estimated_count is true if reltuples is an estimated value.  (Note that
-	 * reltuples could be -1 in this case, indicating we have no idea.)
-	 */
-	double		reltuples;
-	bool		estimated_count;
-
-	/*
-	 * In single process lazy vacuum we could consume more memory during index
-	 * vacuuming or cleanup apart from the memory for heap scanning.  In
-	 * parallel vacuum, since individual vacuum workers can consume memory
-	 * equal to maintenance_work_mem, the new maintenance_work_mem for each
-	 * worker is set such that the parallel operation doesn't consume more
-	 * memory than single process lazy vacuum.
-	 */
-	int			maintenance_work_mem_worker;
-
-	/*
-	 * Shared vacuum cost balance.  During parallel vacuum,
-	 * VacuumSharedCostBalance points to this value and it accumulates the
-	 * balance of each parallel vacuum worker.
-	 */
-	pg_atomic_uint32 cost_balance;
-
-	/*
-	 * Number of active parallel workers.  This is used for computing the
-	 * minimum threshold of the vacuum cost balance before a worker sleeps for
-	 * cost-based delay.
-	 */
-	pg_atomic_uint32 active_nworkers;
-
-	/* Counter for vacuuming and cleanup */
-	pg_atomic_uint32 idx;
-} LVShared;
-
-/* Status used during parallel index vacuum or cleanup */
-typedef enum LVParallelIndVacStatus
-{
-	PARALLEL_INDVAC_STATUS_INITIAL = 0,
-	PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
-	PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
-	PARALLEL_INDVAC_STATUS_COMPLETED
-} LVParallelIndVacStatus;
-
-/*
- * Struct for index vacuum statistics of an index that is used for parallel vacuum.
- * This includes the status of parallel index vacuum as well as index statistics.
- */
-typedef struct LVParallelIndStats
-{
-	/*
-	 * The following two fields are set by leader process before executing
-	 * parallel index vacuum or parallel index cleanup.  These fields are not
-	 * fixed for the entire VACUUM operation.  They are only fixed for an
-	 * individual parallel index vacuum and cleanup.
-	 *
-	 * parallel_workers_can_process is true if both leader and worker can
-	 * process the index, otherwise only leader can process it.
-	 */
-	LVParallelIndVacStatus status;
-	bool		parallel_workers_can_process;
-
-	/*
-	 * Individual worker or leader stores the result of index vacuum or
-	 * cleanup.
-	 */
-	bool		istat_updated;	/* are the stats updated? */
-	IndexBulkDeleteResult istat;
-} LVParallelIndStats;
-
-/* Struct for maintaining a parallel vacuum state. */
-typedef struct LVParallelState
-{
-	ParallelContext *pcxt;
-
-	/* Shared information among parallel vacuum workers */
-	LVShared   *lvshared;
-
-	/*
-	 * Shared index statistics among parallel vacuum workers. The array
-	 * element is allocated for every index, even those indexes where parallel
-	 * index vacuuming is unsafe or not worthwhile (e.g.,
-	 * will_parallel_vacuum[] is false).  During parallel vacuum,
-	 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
-	 * local memory at the end of parallel vacuum.
-	 */
-	LVParallelIndStats *lvpindstats;
-
-	/* Points to buffer usage area in DSM */
-	BufferUsage *buffer_usage;
-
-	/* Points to WAL usage area in DSM */
-	WalUsage   *wal_usage;
-
-	/*
-	 * False if the index is totally unsuitable target for all parallel
-	 * processing. For example, the index could be <
-	 * min_parallel_index_scan_size cutoff.
-	 */
-	bool	   *will_parallel_vacuum;
-
-	/*
-	 * The number of indexes that support parallel index bulk-deletion and
-	 * parallel index cleanup respectively.
-	 */
-	int			nindexes_parallel_bulkdel;
-	int			nindexes_parallel_cleanup;
-	int			nindexes_parallel_condcleanup;
-} LVParallelState;
-
 typedef struct LVRelState
 {
 	/* Target heap relation and its indexes */
@@ -315,9 +153,9 @@ typedef struct LVRelState
 	bool		do_index_cleanup;
 	bool		do_rel_truncate;
 
-	/* Buffer access strategy and parallel state */
+	/* Buffer access strategy and parallel vacuum state */
 	BufferAccessStrategy bstrategy;
-	LVParallelState *lps;
+	ParallelVacuumState *pvs;
 
 	/* rel's initial relfrozenxid and relminmxid */
 	TransactionId relfrozenxid;
@@ -339,9 +177,14 @@ typedef struct LVRelState
 	VacErrPhase phase;
 
 	/*
-	 * State managed by lazy_scan_heap() follows
+	 * State managed by lazy_scan_heap() follows.
+	 *
+	 * dead_items stores TIDs whose index tuples are deleted by index vacuuming.
+	 * Each TID points to an LP_DEAD line pointer from a heap page that has been
+	 * processed by lazy_scan_prune.  Also needed by lazy_vacuum_heap_rel, which
+	 * marks the same LP_DEAD line pointers as LP_UNUSED during second heap pass.
 	 */
-	LVDeadItems *dead_items;	/* TIDs whose index tuples we'll delete */
+	VacDeadItems *dead_items;	/* TIDs whose index tuples we'll delete */
 	BlockNumber rel_pages;		/* total number of pages */
 	BlockNumber scanned_pages;	/* number of pages we examined */
 	BlockNumber pinskipped_pages;	/* # of pages skipped due to a pin */
@@ -413,13 +256,6 @@ static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
 									LVRelState *vacrel);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum);
-static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
-												 LVParallelIndStats *pindstats);
-static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel);
-static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
-											  LVShared *shared,
-											  LVParallelIndStats *pindstats);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
 													IndexBulkDeleteResult *istat,
 													double reltuples,
@@ -433,21 +269,12 @@ static bool should_attempt_truncation(LVRelState *vacrel);
 static void lazy_truncate_heap(LVRelState *vacrel);
 static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
 											bool *lock_waiter_detected);
-static int dead_items_max_items(LVRelState *vacrel);
 static inline Size max_items_to_alloc_size(int max_items);
 static void dead_items_alloc(LVRelState *vacrel, int nworkers);
 static void dead_items_cleanup(LVRelState *vacrel);
-static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
-static int	vac_cmp_itemptr(const void *left, const void *right);
 static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 									 TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int	parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
-											bool *will_parallel_vacuum);
 static void update_index_statistics(LVRelState *vacrel);
-static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested);
-static void parallel_vacuum_end(LVRelState *vacrel);
-static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
-												   bool vacuum);
 static void vacuum_error_callback(void *arg);
 static void update_vacuum_error_info(LVRelState *vacrel,
 									 LVSavedErrInfo *saved_vacrel,
@@ -905,7 +732,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive)
 {
-	LVDeadItems *dead_items;
+	VacDeadItems *dead_items;
 	BlockNumber nblocks,
 				blkno,
 				next_unskippable_block,
@@ -2040,7 +1867,7 @@ retry:
 	 */
 	if (lpdead_items > 0)
 	{
-		LVDeadItems *dead_items = vacrel->dead_items;
+		VacDeadItems *dead_items = vacrel->dead_items;
 		ItemPointerData tmp;
 
 		Assert(!prunestate->all_visible);
@@ -2083,7 +1910,6 @@ lazy_vacuum(LVRelState *vacrel)
 
 	/* Should not end up here with no indexes */
 	Assert(vacrel->nindexes > 0);
-	Assert(!IsParallelWorker());
 	Assert(vacrel->lpdead_item_pages > 0);
 
 	if (!vacrel->do_index_vacuuming)
@@ -2212,7 +2038,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 {
 	bool		allindexes = true;
 
-	Assert(!IsParallelWorker());
 	Assert(vacrel->nindexes > 0);
 	Assert(vacrel->do_index_vacuuming);
 	Assert(vacrel->do_index_cleanup);
@@ -2251,8 +2076,21 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	}
 	else
 	{
-		/* Outsource everything to parallel variant */
-		parallel_vacuum_process_all_indexes(vacrel, true);
+		LVSavedErrInfo saved_err_info;
+
+		/*
+		 * Outsource everything to parallel variant. Since parallel vacuum will
+		 * set the error context on an error we temporarily disable setting our
+		 * error context.
+		 */
+		update_vacuum_error_info(vacrel, &saved_err_info,
+								 VACUUM_ERRCB_PHASE_UNKNOWN,
+								 InvalidBlockNumber, InvalidOffsetNumber);
+
+		parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples);
+
+		/* Revert to the previous phase information for error traceback */
+		restore_vacuum_error_info(vacrel, &saved_err_info);
 
 		/*
 		 * Do a postcheck to consider applying wraparound failsafe now.  Note
@@ -2404,7 +2242,7 @@ static int
 lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 					  int index, Buffer *vmbuffer)
 {
-	LVDeadItems *dead_items = vacrel->dead_items;
+	VacDeadItems *dead_items = vacrel->dead_items;
 	Page		page = BufferGetPage(buffer);
 	OffsetNumber unused[MaxHeapTuplesPerPage];
 	int			uncnt = 0;
@@ -2625,353 +2463,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 	return false;
 }
 
-/*
- * Perform index vacuum or index cleanup with parallel workers.  This function
- * must be used by the parallel vacuum leader process.
- */
-static void
-parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum)
-{
-	LVParallelState *lps = vacrel->lps;
-	LVParallelIndVacStatus new_status;
-	int			nworkers;
-
-	Assert(!IsParallelWorker());
-	Assert(ParallelVacuumIsActive(vacrel));
-	Assert(vacrel->nindexes > 0);
-
-	if (vacuum)
-	{
-		/*
-		 * We can only provide an approximate value of num_heap_tuples, at
-		 * least for now.  Matches serial VACUUM case.
-		 */
-		vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
-		vacrel->lps->lvshared->estimated_count = true;
-
-		new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
-
-		/* Determine the number of parallel workers to launch */
-		nworkers = vacrel->lps->nindexes_parallel_bulkdel;
-	}
-	else
-	{
-		/*
-		 * We can provide a better estimate of total number of surviving
-		 * tuples (we assume indexes are more interested in that than in the
-		 * number of nominally live tuples).
-		 */
-		vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
-		vacrel->lps->lvshared->estimated_count =
-			(vacrel->tupcount_pages < vacrel->rel_pages);
-
-		new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
-
-		/* Determine the number of parallel workers to launch */
-		nworkers = vacrel->lps->nindexes_parallel_cleanup;
-
-		/* Add conditionally parallel-aware indexes if in the first time call */
-		if (vacrel->num_index_scans == 0)
-			nworkers += vacrel->lps->nindexes_parallel_condcleanup;
-	}
-
-	/* The leader process will participate */
-	nworkers--;
-
-	/*
-	 * It is possible that parallel context is initialized with fewer workers
-	 * than the number of indexes that need a separate worker in the current
-	 * phase, so we need to consider it.  See
-	 * parallel_vacuum_compute_workers().
-	 */
-	nworkers = Min(nworkers, lps->pcxt->nworkers);
-
-	/*
-	 * Set index vacuum status and mark whether parallel vacuum worker can
-	 * process it.
-	 */
-	for (int i = 0; i < vacrel->nindexes; i++)
-	{
-		LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]);
-
-		Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
-		pindstats->status = new_status;
-		pindstats->parallel_workers_can_process =
-			(lps->will_parallel_vacuum[i] &
-			 parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i],
-													vacuum));
-	}
-
-	/* Reset the parallel index processing counter */
-	pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
-	/* Setup the shared cost-based vacuum delay and launch workers */
-	if (nworkers > 0)
-	{
-		/* Reinitialize parallel context to relaunch parallel workers */
-		if (vacrel->num_index_scans > 0)
-			ReinitializeParallelDSM(lps->pcxt);
-
-		/*
-		 * Set up shared cost balance and the number of active workers for
-		 * vacuum delay.  We need to do this before launching workers as
-		 * otherwise, they might not see the updated values for these
-		 * parameters.
-		 */
-		pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
-		pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
-
-		/*
-		 * The number of workers can vary between bulkdelete and cleanup
-		 * phase.
-		 */
-		ReinitializeParallelWorkers(lps->pcxt, nworkers);
-
-		LaunchParallelWorkers(lps->pcxt);
-
-		if (lps->pcxt->nworkers_launched > 0)
-		{
-			/*
-			 * Reset the local cost values for leader backend as we have
-			 * already accumulated the remaining balance of heap.
-			 */
-			VacuumCostBalance = 0;
-			VacuumCostBalanceLocal = 0;
-
-			/* Enable shared cost balance for leader backend */
-			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
-			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
-		}
-
-		if (vacuum)
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
-									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-		else
-			ereport(elevel,
-					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
-									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
-									 lps->pcxt->nworkers_launched),
-							lps->pcxt->nworkers_launched, nworkers)));
-	}
-
-	/* Process the indexes that can be processed by only leader process */
-	parallel_vacuum_process_unsafe_indexes(vacrel);
-
-	/*
-	 * Join as a parallel worker.  The leader process alone processes all
-	 * parallel-safe indexes in the case where no workers are launched.
-	 */
-	parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats);
-
-	/*
-	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
-	 * to finish, or we might get incomplete data.)
-	 */
-	if (nworkers > 0)
-	{
-		/* Wait for all vacuum workers to finish */
-		WaitForParallelWorkersToFinish(lps->pcxt);
-
-		for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
-			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
-	}
-
-	/*
-	 * Reset all index status back to initial (while checking that we have
-	 * processed all indexes).
-	 */
-	for (int i = 0; i < vacrel->nindexes; i++)
-	{
-		LVParallelIndStats *pindstats = &(lps->lvpindstats[i]);
-
-		if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
-			elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
-				 RelationGetRelationName(vacrel->indrels[i]));
-
-		pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
-	}
-
-	/*
-	 * Carry the shared balance value to heap scan and disable shared costing
-	 */
-	if (VacuumSharedCostBalance)
-	{
-		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
-		VacuumSharedCostBalance = NULL;
-		VacuumActiveNWorkers = NULL;
-	}
-}
-
-/*
- * Index vacuum/cleanup routine used by the leader process and parallel
- * vacuum worker processes to process the indexes in parallel.
- */
-static void
-parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
-									 LVParallelIndStats *pindstats)
-{
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
-	/* Loop until all indexes are vacuumed */
-	for (;;)
-	{
-		int			idx;
-		LVParallelIndStats *pis;
-
-		/* Get an index number to process */
-		idx = pg_atomic_fetch_add_u32(&(shared->idx), 1);
-
-		/* Done for all indexes? */
-		if (idx >= vacrel->nindexes)
-			break;
-
-		pis = &(pindstats[idx]);
-
-		/*
-		 * Skip processing index that is unsafe for workers or has an
-		 * unsuitable target for parallel index vacuum (this is processed in
-		 * parallel_vacuum_process_unsafe_indexes() by the leader).
-		 */
-		if (!pis->parallel_workers_can_process)
-			continue;
-
-		/* Do vacuum or cleanup of the index */
-		parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
-										  shared, pis);
-	}
-
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Perform parallel processing of indexes in leader process.
- *
- * Handles index vacuuming (or index cleanup) for indexes that are not
- * parallel safe.  It's possible that this will vary for a given index, based
- * on details like whether we're performing index cleanup right now.
- *
- * Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by parallel_vacuum_compute_workers().
- */
-static void
-parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel)
-{
-	LVParallelState *lps = vacrel->lps;
-
-	Assert(!IsParallelWorker());
-
-	/*
-	 * Increment the active worker count if we are able to launch any worker.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
-
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
-	{
-		LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
-
-		/* Skip, indexes that are safe for workers */
-		if (pindstats->parallel_workers_can_process)
-			continue;
-
-		/* Do vacuum or cleanup of the index */
-		parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
-										  lps->lvshared, pindstats);
-	}
-
-	/*
-	 * We have completed the index vacuum so decrement the active worker
-	 * count.
-	 */
-	if (VacuumActiveNWorkers)
-		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
-}
-
-/*
- * Vacuum or cleanup index either by leader process or by one of the worker
- * process.  After processing the index this function copies the index
- * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
- * segment.
- */
-static void
-parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
-								  LVShared *shared, LVParallelIndStats *pindstats)
-{
-	IndexBulkDeleteResult *istat = NULL;
-	IndexBulkDeleteResult *istat_res;
-
-	/*
-	 * Update the pointer to the corresponding bulk-deletion result if someone
-	 * has already updated it
-	 */
-	if (pindstats->istat_updated)
-		istat = &(pindstats->istat);
-
-	switch (pindstats->status)
-	{
-		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
-			istat_res = lazy_vacuum_one_index(indrel, istat,
-											  shared->reltuples, vacrel);
-			break;
-		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
-			istat_res = lazy_cleanup_one_index(indrel, istat,
-											   shared->reltuples,
-											   shared->estimated_count,
-											   vacrel);
-			break;
-		default:
-			elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
-				 pindstats->status,
-				 RelationGetRelationName(indrel));
-	}
-
-	/*
-	 * Copy the index bulk-deletion result returned from ambulkdelete and
-	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
-	 * allocate locally and it's possible that an index will be vacuumed by a
-	 * different vacuum process the next cycle.  Copying the result normally
-	 * happens only the first time an index is vacuumed.  For any additional
-	 * vacuum pass, we directly point to the result on the DSM segment and
-	 * pass it to vacuum index APIs so that workers can update it directly.
-	 *
-	 * Since all vacuum workers write the bulk-deletion result at different
-	 * slots we can write them without locking.
-	 */
-	if (!pindstats->istat_updated && istat_res != NULL)
-	{
-		memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
-		pindstats->istat_updated = true;
-
-		/* Free the locally-allocated bulk-deletion result */
-		pfree(istat_res);
-	}
-
-	/*
-	 * Update the status to completed. No need to lock here since each worker
-	 * touches different indexes.
-	 */
-	pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
-}
-
 /*
  *	lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
  */
 static void
 lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
-	Assert(!IsParallelWorker());
 	Assert(vacrel->nindexes > 0);
 
 	/* Report that we are now cleaning up indexes */
@@ -2996,8 +2493,23 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 	}
 	else
 	{
-		/* Outsource everything to parallel variant */
-		parallel_vacuum_process_all_indexes(vacrel, false);
+		LVSavedErrInfo saved_err_info;
+
+		/*
+		 * Outsource everything to parallel variant. Since parallel vacuum will
+		 * set the error context on an error we temporarily disable setting our
+		 * error context.
+		 */
+		update_vacuum_error_info(vacrel, &saved_err_info,
+								 VACUUM_ERRCB_PHASE_UNKNOWN,
+								 InvalidBlockNumber, InvalidOffsetNumber);
+
+		parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples,
+											(vacrel->tupcount_pages < vacrel->rel_pages),
+											vacrel->num_index_scans);
+
+		/* Revert to the previous phase information for error traceback */
+		restore_vacuum_error_info(vacrel, &saved_err_info);
 	}
 }
 
@@ -3045,13 +2557,7 @@ lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
 	/* Do bulk deletion */
-	istat = index_bulk_delete(&ivinfo, istat, lazy_tid_reaped,
-							  (void *) vacrel->dead_items);
-
-	ereport(elevel,
-			(errmsg("scanned index \"%s\" to remove %d row versions",
-					vacrel->indname, vacrel->dead_items->num_items),
-			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+	istat = bulkdel_one_index(&ivinfo, istat, vacrel->dead_items);
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -3086,7 +2592,6 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 	ivinfo.report_progress = false;
 	ivinfo.estimated_count = estimated_count;
 	ivinfo.message_level = elevel;
-
 	ivinfo.num_heap_tuples = reltuples;
 	ivinfo.strategy = vacrel->bstrategy;
 
@@ -3102,24 +2607,7 @@ lazy_cleanup_one_index(Relation indrel, IndexBulkDeleteResult *istat,
 							 VACUUM_ERRCB_PHASE_INDEX_CLEANUP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	istat = index_vacuum_cleanup(&ivinfo, istat);
-
-	if (istat)
-	{
-		ereport(elevel,
-				(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
-						RelationGetRelationName(indrel),
-						istat->num_index_tuples,
-						istat->num_pages),
-				 errdetail("%.0f index row versions were removed.\n"
-						   "%u index pages were newly deleted.\n"
-						   "%u index pages are currently deleted, of which %u are currently reusable.\n"
-						   "%s.",
-						   istat->tuples_removed,
-						   istat->pages_newly_deleted,
-						   istat->pages_deleted, istat->pages_free,
-						   pg_rusage_show(&ru0))));
-	}
+	istat = cleanup_one_index(&ivinfo, istat);
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -3455,8 +2943,6 @@ dead_items_max_items(LVRelState *vacrel)
 	autovacuum_work_mem != -1 ?
 	autovacuum_work_mem : maintenance_work_mem;
 
-	Assert(!IsParallelWorker());
-
 	if (vacrel->nindexes > 0)
 	{
 		BlockNumber rel_pages = vacrel->rel_pages;
@@ -3481,19 +2967,6 @@ dead_items_max_items(LVRelState *vacrel)
 	return (int) max_items;
 }
 
-/*
- * Returns the total required space for VACUUM's dead_items array given a
- * max_items value returned by dead_items_max_items
- */
-static inline Size
-max_items_to_alloc_size(int max_items)
-{
-	Assert(max_items >= MaxHeapTuplesPerPage);
-	Assert(max_items <= MAXDEADITEMS(MaxAllocSize));
-
-	return offsetof(LVDeadItems, items) + sizeof(ItemPointerData) * max_items;
-}
-
 /*
  * Allocate dead_items (either using palloc, or in dynamic shared memory).
  * Sets dead_items in vacrel for caller.
@@ -3504,9 +2977,12 @@ max_items_to_alloc_size(int max_items)
 static void
 dead_items_alloc(LVRelState *vacrel, int nworkers)
 {
-	LVDeadItems *dead_items;
+	VacDeadItems *dead_items;
 	int			max_items;
 
+	max_items = dead_items_max_items(vacrel);
+	Assert(max_items >= MaxHeapTuplesPerPage);
+
 	/*
 	 * Initialize state for a parallel vacuum.  As of now, only one worker can
 	 * be used for an index, so we invoke parallelism only if there are at
@@ -3530,16 +3006,21 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
 								vacrel->relname)));
 		}
 		else
-			parallel_vacuum_begin(vacrel, nworkers);
+			vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
+											   vacrel->nindexes, nworkers,
+											   max_items, elevel,
+											   vacrel->bstrategy);
 
-		/* If parallel mode started, vacrel->dead_items allocated in DSM */
+		/* If parallel mode started, dead_items space is allocated in DSM */
 		if (ParallelVacuumIsActive(vacrel))
+		{
+			vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs);
 			return;
+		}
 	}
 
 	/* Serial VACUUM case */
-	max_items = dead_items_max_items(vacrel);
-	dead_items = (LVDeadItems *) palloc(max_items_to_alloc_size(max_items));
+	dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items));
 	dead_items->max_items = max_items;
 	dead_items->num_items = 0;
 
@@ -3562,75 +3043,8 @@ dead_items_cleanup(LVRelState *vacrel)
 	 * End parallel mode before updating index statistics as we cannot write
 	 * during parallel mode.
 	 */
-	parallel_vacuum_end(vacrel);
-}
-
-/*
- *	lazy_tid_reaped() -- is a particular tid deletable?
- *
- *		This has the right signature to be an IndexBulkDeleteCallback.
- *
- *		Assumes dead_items array is sorted (in ascending TID order).
- */
-static bool
-lazy_tid_reaped(ItemPointer itemptr, void *state)
-{
-	LVDeadItems *dead_items = (LVDeadItems *) state;
-	int64		litem,
-				ritem,
-				item;
-	ItemPointer res;
-
-	litem = itemptr_encode(&dead_items->items[0]);
-	ritem = itemptr_encode(&dead_items->items[dead_items->num_items - 1]);
-	item = itemptr_encode(itemptr);
-
-	/*
-	 * Doing a simple bound check before bsearch() is useful to avoid the
-	 * extra cost of bsearch(), especially if dead items on the heap are
-	 * concentrated in a certain range.  Since this function is called for
-	 * every index tuple, it pays to be really fast.
-	 */
-	if (item < litem || item > ritem)
-		return false;
-
-	res = (ItemPointer) bsearch((void *) itemptr,
-								(void *) dead_items->items,
-								dead_items->num_items,
-								sizeof(ItemPointerData),
-								vac_cmp_itemptr);
-
-	return (res != NULL);
-}
-
-/*
- * Comparator routines for use with qsort() and bsearch().
- */
-static int
-vac_cmp_itemptr(const void *left, const void *right)
-{
-	BlockNumber lblk,
-				rblk;
-	OffsetNumber loff,
-				roff;
-
-	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
-	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
-
-	if (lblk < rblk)
-		return -1;
-	if (lblk > rblk)
-		return 1;
-
-	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
-	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
-
-	if (loff < roff)
-		return -1;
-	if (loff > roff)
-		return 1;
-
-	return 0;
+	parallel_vacuum_end(vacrel->pvs, vacrel->indstats);
+	vacrel->pvs = NULL;
 }
 
 /*
@@ -3754,77 +3168,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
 	return all_visible;
 }
 
-/*
- * Compute the number of parallel worker processes to request.  Both index
- * vacuum and index cleanup can be executed with parallel workers.  The index
- * is eligible for parallel vacuum iff its size is greater than
- * min_parallel_index_scan_size as invoking workers for very small indexes
- * can hurt performance.
- *
- * nrequested is the number of parallel workers that user requested.  If
- * nrequested is 0, we compute the parallel degree based on nindexes, that is
- * the number of indexes that support parallel vacuum.  This function also
- * sets will_parallel_vacuum to remember indexes that participate in parallel
- * vacuum.
- */
-static int
-parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
-								bool *will_parallel_vacuum)
-{
-	int			nindexes_parallel = 0;
-	int			nindexes_parallel_bulkdel = 0;
-	int			nindexes_parallel_cleanup = 0;
-	int			parallel_workers;
-
-	/*
-	 * We don't allow performing parallel operation in standalone backend or
-	 * when parallelism is disabled.
-	 */
-	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
-		return 0;
-
-	/*
-	 * Compute the number of indexes that can participate in parallel vacuum.
-	 */
-	for (int idx = 0; idx < vacrel->nindexes; idx++)
-	{
-		Relation	indrel = vacrel->indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-		/* Skip index that is not a suitable target for parallel index vacuum */
-		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
-			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
-			continue;
-
-		will_parallel_vacuum[idx] = true;
-
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			nindexes_parallel_bulkdel++;
-		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
-			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-			nindexes_parallel_cleanup++;
-	}
-
-	nindexes_parallel = Max(nindexes_parallel_bulkdel,
-							nindexes_parallel_cleanup);
-
-	/* The leader process takes one index */
-	nindexes_parallel--;
-
-	/* No index supports parallel vacuum */
-	if (nindexes_parallel <= 0)
-		return 0;
-
-	/* Compute the parallel degree */
-	parallel_workers = (nrequested > 0) ?
-		Min(nrequested, nindexes_parallel) : nindexes_parallel;
-
-	/* Cap by max_parallel_maintenance_workers */
-	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
-
-	return parallel_workers;
-}
-
 /*
  * Update index statistics in pg_class if the statistics are accurate.
  */
@@ -3835,7 +3178,7 @@ update_index_statistics(LVRelState *vacrel)
 	int			nindexes = vacrel->nindexes;
 	IndexBulkDeleteResult **indstats = vacrel->indstats;
 
-	Assert(!IsInParallelMode());
+	Assert(!ParallelVacuumIsActive(vacrel));
 
 	for (int idx = 0; idx < nindexes; idx++)
 	{
@@ -3857,393 +3200,6 @@ update_index_statistics(LVRelState *vacrel)
 	}
 }
 
-/*
- * Try to enter parallel mode and create a parallel context.  Then initialize
- * shared memory state.
- *
- * On success (when we can launch one or more workers), will set dead_items and
- * lps in vacrel for caller.  A set lps in vacrel state indicates that parallel
- * VACUUM is currently active.
- */
-static void
-parallel_vacuum_begin(LVRelState *vacrel, int nrequested)
-{
-	LVParallelState *lps;
-	Relation   *indrels = vacrel->indrels;
-	int			nindexes = vacrel->nindexes;
-	ParallelContext *pcxt;
-	LVShared   *shared;
-	LVDeadItems *dead_items;
-	LVParallelIndStats *pindstats;
-	BufferUsage *buffer_usage;
-	WalUsage   *wal_usage;
-	bool	   *will_parallel_vacuum;
-	int			max_items;
-	Size		est_pindstats_len;
-	Size		est_shared_len;
-	Size		est_dead_items_len;
-	int			nindexes_mwm = 0;
-	int			parallel_workers = 0;
-	int			querylen;
-
-	/*
-	 * A parallel vacuum must be requested and there must be indexes on the
-	 * relation
-	 */
-	Assert(nrequested >= 0);
-	Assert(nindexes > 0);
-
-	/*
-	 * Compute the number of parallel vacuum workers to launch
-	 */
-	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
-	parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested,
-													   will_parallel_vacuum);
-	if (parallel_workers <= 0)
-	{
-		/* Can't perform vacuum in parallel -- lps not set in vacrel */
-		pfree(will_parallel_vacuum);
-		return;
-	}
-
-	lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
-
-	EnterParallelMode();
-	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
-								 parallel_workers);
-	Assert(pcxt->nworkers > 0);
-	lps->pcxt = pcxt;
-	lps->will_parallel_vacuum = will_parallel_vacuum;
-
-	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
-	est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes);
-	shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
-	est_shared_len = sizeof(LVShared);
-	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
-	max_items = dead_items_max_items(vacrel);
-	est_dead_items_len = max_items_to_alloc_size(max_items);
-	shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/*
-	 * Estimate space for BufferUsage and WalUsage --
-	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgBufferUsage or
-	 * pgWalUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(WalUsage), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
-
-	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
-	if (debug_query_string)
-	{
-		querylen = strlen(debug_query_string);
-		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
-		shm_toc_estimate_keys(&pcxt->estimator, 1);
-	}
-	else
-		querylen = 0;			/* keep compiler quiet */
-
-	InitializeParallelDSM(pcxt);
-
-	/* Prepare index vacuum stats */
-	pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len);
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		Relation	indrel = indrels[idx];
-		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-		/*
-		 * Cleanup option should be either disabled, always performing in
-		 * parallel or conditionally performing in parallel.
-		 */
-		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
-			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
-		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
-
-		if (!will_parallel_vacuum[idx])
-			continue;
-
-		if (indrel->rd_indam->amusemaintenanceworkmem)
-			nindexes_mwm++;
-
-		/*
-		 * Remember the number of indexes that support parallel operation for
-		 * each phase.
-		 */
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
-			lps->nindexes_parallel_bulkdel++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
-			lps->nindexes_parallel_cleanup++;
-		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
-			lps->nindexes_parallel_condcleanup++;
-	}
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats);
-	lps->lvpindstats = pindstats;
-
-	/* Prepare shared information */
-	shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
-	MemSet(shared, 0, est_shared_len);
-	shared->relid = RelationGetRelid(vacrel->rel);
-	shared->elevel = elevel;
-	shared->maintenance_work_mem_worker =
-		(nindexes_mwm > 0) ?
-		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
-		maintenance_work_mem;
-
-	pg_atomic_init_u32(&(shared->cost_balance), 0);
-	pg_atomic_init_u32(&(shared->active_nworkers), 0);
-	pg_atomic_init_u32(&(shared->idx), 0);
-
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
-	lps->lvshared = shared;
-
-	/* Prepare the dead_items space */
-	dead_items = (LVDeadItems *) shm_toc_allocate(pcxt->toc,
-												  est_dead_items_len);
-	dead_items->max_items = max_items;
-	dead_items->num_items = 0;
-	MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
-
-	/*
-	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
-	 * initialize
-	 */
-	buffer_usage = shm_toc_allocate(pcxt->toc,
-									mul_size(sizeof(BufferUsage), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
-	lps->buffer_usage = buffer_usage;
-	wal_usage = shm_toc_allocate(pcxt->toc,
-								 mul_size(sizeof(WalUsage), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
-	lps->wal_usage = wal_usage;
-
-	/* Store query string for workers */
-	if (debug_query_string)
-	{
-		char	   *sharedquery;
-
-		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
-		memcpy(sharedquery, debug_query_string, querylen + 1);
-		sharedquery[querylen] = '\0';
-		shm_toc_insert(pcxt->toc,
-					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
-	}
-
-	/* Success -- set dead_items and lps in leader's vacrel state */
-	vacrel->dead_items = dead_items;
-	vacrel->lps = lps;
-}
-
-/*
- * Destroy the parallel context, and end parallel mode.
- *
- * Since writes are not allowed during parallel mode, copy the
- * updated index statistics from DSM into local memory and then later use that
- * to update the index statistics.  One might think that we can exit from
- * parallel mode, update the index statistics and then destroy parallel
- * context, but that won't be safe (see ExitParallelMode).
- */
-static void
-parallel_vacuum_end(LVRelState *vacrel)
-{
-	IndexBulkDeleteResult **indstats = vacrel->indstats;
-	LVParallelState *lps = vacrel->lps;
-	int			nindexes = vacrel->nindexes;
-
-	Assert(!IsParallelWorker());
-
-	/* Copy the updated statistics */
-	for (int idx = 0; idx < nindexes; idx++)
-	{
-		LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
-
-		if (pindstats->istat_updated)
-		{
-			indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-			memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult));
-		}
-		else
-			indstats[idx] = NULL;
-	}
-
-	DestroyParallelContext(lps->pcxt);
-	ExitParallelMode();
-
-	/* Deactivate parallel vacuum */
-	pfree(lps->will_parallel_vacuum);
-	pfree(lps);
-	vacrel->lps = NULL;
-}
-
-/*
- * Returns false, if the given index can't participate in the next execution of
- * parallel index vacuum or parallel index cleanup.
- */
-static bool
-parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
-									   bool vacuum)
-{
-	uint8		vacoptions;
-
-	vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
-	/* In parallel vacuum case, check if it supports parallel bulk-deletion */
-	if (vacuum)
-		return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
-
-	/* Not safe, if the index does not support parallel cleanup */
-	if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
-		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
-		return false;
-
-	/*
-	 * Not safe, if the index supports parallel cleanup conditionally, but we
-	 * have already processed the index (for bulkdelete).  We do this to avoid
-	 * the need to invoke workers when parallel index cleanup doesn't need to
-	 * scan the index.  See the comments for option
-	 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
-	 * parallel cleanup conditionally.
-	 */
-	if (vacrel->num_index_scans > 0 &&
-		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-		return false;
-
-	return true;
-}
-
-/*
- * Perform work within a launched parallel process.
- *
- * Since parallel vacuum workers perform only index vacuum or index cleanup,
- * we don't need to report progress information.
- */
-void
-parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
-{
-	Relation	rel;
-	Relation   *indrels;
-	LVParallelIndStats *lvpindstats;
-	LVShared   *lvshared;
-	LVDeadItems *dead_items;
-	BufferUsage *buffer_usage;
-	WalUsage   *wal_usage;
-	int			nindexes;
-	char	   *sharedquery;
-	LVRelState	vacrel;
-	ErrorContextCallback errcallback;
-
-	/*
-	 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
-	 * don't support parallel vacuum for autovacuum as of now.
-	 */
-	Assert(MyProc->statusFlags == PROC_IN_VACUUM);
-
-	lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
-										   false);
-	elevel = lvshared->elevel;
-
-	elog(DEBUG1, "starting parallel vacuum worker");
-
-	/* Set debug_query_string for individual workers */
-	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
-	debug_query_string = sharedquery;
-	pgstat_report_activity(STATE_RUNNING, debug_query_string);
-
-	/*
-	 * Open table.  The lock mode is the same as the leader process.  It's
-	 * okay because the lock mode does not conflict among the parallel
-	 * workers.
-	 */
-	rel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
-
-	/*
-	 * Open all indexes. indrels are sorted in order by OID, which should be
-	 * matched to the leader's one.
-	 */
-	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
-	Assert(nindexes > 0);
-
-	/* Set index statistics */
-	lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc,
-														PARALLEL_VACUUM_KEY_INDEX_STATS,
-														false);
-
-	/* Set dead_items space (set as worker's vacrel dead_items below) */
-	dead_items = (LVDeadItems *) shm_toc_lookup(toc,
-												PARALLEL_VACUUM_KEY_DEAD_ITEMS,
-												false);
-
-	/* Set cost-based vacuum delay */
-	VacuumCostActive = (VacuumCostDelay > 0);
-	VacuumCostBalance = 0;
-	VacuumPageHit = 0;
-	VacuumPageMiss = 0;
-	VacuumPageDirty = 0;
-	VacuumCostBalanceLocal = 0;
-	VacuumSharedCostBalance = &(lvshared->cost_balance);
-	VacuumActiveNWorkers = &(lvshared->active_nworkers);
-
-	vacrel.rel = rel;
-	vacrel.indrels = indrels;
-	vacrel.nindexes = nindexes;
-	/* Each parallel VACUUM worker gets its own access strategy */
-	vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM);
-	vacrel.indstats = (IndexBulkDeleteResult **)
-		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
-
-	if (lvshared->maintenance_work_mem_worker > 0)
-		maintenance_work_mem = lvshared->maintenance_work_mem_worker;
-
-	/*
-	 * Initialize vacrel for use as error callback arg by parallel worker.
-	 */
-	vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel));
-	vacrel.relname = pstrdup(RelationGetRelationName(rel));
-	vacrel.indname = NULL;
-	vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN;	/* Not yet processing */
-	vacrel.dead_items = dead_items;
-
-	/* Setup error traceback support for ereport() */
-	errcallback.callback = vacuum_error_callback;
-	errcallback.arg = &vacrel;
-	errcallback.previous = error_context_stack;
-	error_context_stack = &errcallback;
-
-	/* Prepare to track buffer usage during parallel execution */
-	InstrStartParallelQuery();
-
-	/* Process indexes to perform vacuum/cleanup */
-	parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats);
-
-	/* Report buffer/WAL usage during parallel execution */
-	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
-	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
-	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
-						  &wal_usage[ParallelWorkerNumber]);
-
-	/* Pop the error context stack */
-	error_context_stack = errcallback.previous;
-
-	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
-	table_close(rel, ShareUpdateExclusiveLock);
-	FreeAccessStrategy(vacrel.bstrategy);
-	pfree(vacrel.indstats);
-}
-
 /*
  * Error context callback for errors occurring during vacuum.
  */
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index bb1881f573..ae7c7133dd 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,7 +14,6 @@
 
 #include "postgres.h"
 
-#include "access/heapam.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -25,6 +24,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index e8504f0ae4..48f7348f91 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -59,6 +59,7 @@ OBJS = \
 	typecmds.o \
 	user.o \
 	vacuum.o \
+	vacuumparallel.o \
 	variable.o \
 	view.o
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5c4bc15b44..2eb73bf1ce 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -32,6 +32,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/index.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -51,6 +52,7 @@
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/pg_rusage.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -89,6 +91,8 @@ static void vac_truncate_clog(TransactionId frozenXID,
 static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
 static double compute_parallel_delay(void);
 static VacOptValue get_vacoptval_from_boolean(DefElem *def);
+static bool vac_tid_reaped(ItemPointer itemptr, void *state);
+static int vac_cmp_itemptr(const void *left, const void *right);
 
 /*
  * Primary entry point for manual VACUUM and ANALYZE commands
@@ -2258,3 +2262,147 @@ get_vacoptval_from_boolean(DefElem *def)
 {
 	return defGetBoolean(def) ? VACOPTVALUE_ENABLED : VACOPTVALUE_DISABLED;
 }
+
+/*
+ *	bulkdel_one_index() -- bulk-deletion for index relation.
+ *
+ *		Calls index AM's ambulkdelete routine.
+ *
+ * Returns bulk delete stats derived from input stats
+ */
+IndexBulkDeleteResult *
+bulkdel_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat,
+				  VacDeadItems *dead_items)
+{
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	/* Do bulk deletion */
+	istat = index_bulk_delete(ivinfo, istat, vac_tid_reaped,
+							  (void *) dead_items);
+
+	ereport(ivinfo->message_level,
+			(errmsg("scanned index \"%s\" to remove %d row versions",
+					RelationGetRelationName(ivinfo->index),
+					dead_items->num_items),
+			 errdetail_internal("%s", pg_rusage_show(&ru0))));
+
+	return istat;
+}
+
+/*
+ *	cleanup_one_index() -- do post-vacuum cleanup for index relation.
+ *
+ *		Calls index AM's amvacuumcleanup routine.
+ *
+ * Returns bulk delete stats derived from input stats
+ */
+IndexBulkDeleteResult *
+cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat)
+{
+	PGRUsage	ru0;
+
+	pg_rusage_init(&ru0);
+
+	istat = index_vacuum_cleanup(ivinfo, istat);
+
+	if (istat)
+	{
+		ereport(ivinfo->message_level,
+				(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
+						RelationGetRelationName(ivinfo->index),
+						istat->num_index_tuples,
+						istat->num_pages),
+				 errdetail("%.0f index row versions were removed.\n"
+						   "%u index pages were newly deleted.\n"
+						   "%u index pages are currently deleted, of which %u are currently reusable.\n"
+						   "%s.",
+						   istat->tuples_removed,
+						   istat->pages_newly_deleted,
+						   istat->pages_deleted, istat->pages_free,
+						   pg_rusage_show(&ru0))));
+	}
+
+	return istat;
+}
+
+
+/*
+ * Returns the total required space for VACUUM's dead_items array given a
+ * max_items value.
+ */
+inline Size
+vac_max_items_to_alloc_size(int max_items)
+{
+	Assert(max_items <= MAXDEADITEMS(MaxAllocSize));
+
+	return offsetof(VacDeadItems, items) + sizeof(ItemPointerData) * max_items;
+}
+/*
+ *	vac_tid_reaped() -- is a particular tid deletable?
+ *
+ *		This has the right signature to be an IndexBulkDeleteCallback.
+ *
+ *		Assumes dead_items array is sorted (in ascending TID order).
+ */
+static bool
+vac_tid_reaped(ItemPointer itemptr, void *state)
+{
+	VacDeadItems *dead_items = (VacDeadItems *) state;
+	int64		litem,
+				ritem,
+				item;
+	ItemPointer res;
+
+	litem = itemptr_encode(&dead_items->items[0]);
+	ritem = itemptr_encode(&dead_items->items[dead_items->num_items - 1]);
+	item = itemptr_encode(itemptr);
+
+	/*
+	 * Doing a simple bound check before bsearch() is useful to avoid the
+	 * extra cost of bsearch(), especially if dead items on the heap are
+	 * concentrated in a certain range.  Since this function is called for
+	 * every index tuple, it pays to be really fast.
+	 */
+	if (item < litem || item > ritem)
+		return false;
+
+	res = (ItemPointer) bsearch((void *) itemptr,
+								(void *) dead_items->items,
+								dead_items->num_items,
+								sizeof(ItemPointerData),
+								vac_cmp_itemptr);
+
+	return (res != NULL);
+}
+
+/*
+ * Comparator routines for use with qsort() and bsearch().
+ */
+static int
+vac_cmp_itemptr(const void *left, const void *right)
+{
+	BlockNumber lblk,
+				rblk;
+	OffsetNumber loff,
+				roff;
+
+	lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+	rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+	if (lblk < rblk)
+		return -1;
+	if (lblk > rblk)
+		return 1;
+
+	loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+	roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+	if (loff < roff)
+		return -1;
+	if (loff > roff)
+		return 1;
+
+	return 0;
+}
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
new file mode 100644
index 0000000000..86f53bc003
--- /dev/null
+++ b/src/backend/commands/vacuumparallel.c
@@ -0,0 +1,1097 @@
+/*-------------------------------------------------------------------------
+ *
+ * vacuumparallel.c
+ *	  Support routines for parallel vacuum execution.
+ *
+ * This file contains routines that are intended to support setting up, using
+ * and tearing down a ParallelVacuumState.
+ *
+ * In a parallel vacuum, we perform both index bulk deletion and index cleanup
+ * with parallel worker processes.	Individual indexes are processed by one
+ * vacuum process.	ParalleVacuumState contains shared information as well
+ * as the memory space for storing dead items albulklocated in the DSM segment.
+ * When starting either parallel index bulk-deletion or index cleanup, we
+ * launch parallel worker processes.  Once all index are processed, the
+ * parallel worker processes exit.	In the next time, the parallel context
+ * is re-initialized so that the same DSM can be used for multiple passes of
+ * index bulk-deletion and index cleanup.  At the end of a parallel vacuum,
+ * ParallelVacuumState is destroyed while returning index statistics so
+ * that we can update them after exiting from the parallel mode.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/vacuumparallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/amapi.h"
+#include "access/genam.h"
+#include "access/parallel.h"
+#include "access/table.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "optimizer/paths.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/rel.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+/*
+ * DSM keys for parallel vacuum.  Unlike other parallel execution code, since
+ * we don't need to worry about DSM keys conflicting with plan_node_id we can
+ * use small integers.
+ */
+#define PARALLEL_VACUUM_KEY_SHARED			1
+#define PARALLEL_VACUUM_KEY_DEAD_ITEMS		2
+#define PARALLEL_VACUUM_KEY_QUERY_TEXT		3
+#define PARALLEL_VACUUM_KEY_BUFFER_USAGE	4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE		5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS		6
+
+/*
+ * Shared information among parallel workers.  So this is allocated in the DSM
+ * segment.
+ */
+typedef struct PVShared
+{
+	/*
+	 * Target table relid and log level.  These fields are not modified during
+	 * the parallel vacuum.
+	 */
+	Oid			relid;
+	int			elevel;
+
+	/*
+	 * Fields for both index vacuum and cleanup.
+	 *
+	 * reltuples is the total number of input heap tuples.  We set either old
+	 * live tuples in the index vacuum case or the new live tuples in the
+	 * index cleanup case.
+	 *
+	 * estimated_count is true if reltuples is an estimated value.  (Note that
+	 * reltuples could be -1 in this case, indicating we have no idea.)
+	 */
+	double		reltuples;
+	bool		estimated_count;
+
+	/*
+	 * In single process vacuum we could consume more memory during index
+	 * vacuuming or cleanup apart from the memory for heap scanning.  In
+	 * parallel vacuum, since individual vacuum workers can consume memory
+	 * equal to maintenance_work_mem, the new maintenance_work_mem for each
+	 * worker is set such that the parallel operation doesn't consume more
+	 * memory than single process vacuum.
+	 */
+	int			maintenance_work_mem_worker;
+
+	/*
+	 * Shared vacuum cost balance.  During parallel vacuum,
+	 * VacuumSharedCostBalance points to this value and it accumulates the
+	 * balance of each parallel vacuum worker.
+	 */
+	pg_atomic_uint32 cost_balance;
+
+	/*
+	 * Number of active parallel workers.  This is used for computing the
+	 * minimum threshold of the vacuum cost balance before a worker sleeps for
+	 * cost-based delay.
+	 */
+	pg_atomic_uint32 active_nworkers;
+
+	/* Counter for vacuuming and cleanup */
+	pg_atomic_uint32 idx;
+} PVShared;
+
+/* Status used during parallel index vacuum or cleanup */
+typedef enum PVIndVacStatus
+{
+	PARALLEL_INDVAC_STATUS_INITIAL = 0,
+	PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
+	PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
+	PARALLEL_INDVAC_STATUS_COMPLETED
+} PVIndVacStatus;
+
+/*
+ * Struct for index vacuum statistics of an index that is used for parallel vacuum.
+ * This includes the status of parallel index vacuum as well as index statistics.
+ */
+typedef struct PVIndStats
+{
+	/*
+	 * The following two fields are set by leader process before executing
+	 * parallel index bulk-deletion or parallel index cleanup.  These fields
+	 * are not fixed for the entire VACUUM operation.  They are only fixed for
+	 * an individual parallel index bulk-deletion and cleanup.
+	 *
+	 * parallel_workers_can_process is true if both leader and worker can
+	 * process the index, otherwise only leader can process it.
+	 */
+	PVIndVacStatus status;
+	bool		parallel_workers_can_process;
+
+	/*
+	 * Individual worker or leader stores the result of index bulk-deletion or
+	 * cleanup.
+	 */
+	bool		istat_updated;	/* are the stats updated? */
+	IndexBulkDeleteResult istat;
+} PVIndStats;
+
+/* Struct for maintaining a parallel vacuum state. */
+typedef struct ParallelVacuumState
+{
+	/* NULL for worker processes */
+	ParallelContext *pcxt;
+
+	/* Target indexes */
+	Relation   *indrels;
+	int			nindexes;
+
+	/* Shared information among parallel vacuum workers */
+	PVShared   *shared;
+
+	/*
+	 * Shared index statistics among parallel vacuum workers. The array
+	 * element is allocated for every index, even those indexes where parallel
+	 * index vacuuming is unsafe or not worthwhile (e.g.,
+	 * will_parallel_vacuum[] is false).  During parallel vacuum,
+	 * IndexBulkDeleteResult of each index is kept in DSM and is copied into
+	 * local memory at the end of parallel vacuum.
+	 */
+	PVIndStats *indstats;
+
+	/* Shared dead items space among parallel vacuum workers */
+	VacDeadItems *dead_items;
+
+	/* Points to buffer usage area in DSM */
+	BufferUsage *buffer_usage;
+
+	/* Points to WAL usage area in DSM */
+	WalUsage   *wal_usage;
+
+	/*
+	 * False if the index is totally unsuitable target for all parallel
+	 * processing. For example, the index could be <
+	 * min_parallel_index_scan_size cutoff.
+	 */
+	bool	   *will_parallel_vacuum;
+
+	/*
+	 * The number of indexes that support parallel index bulk-deletion and
+	 * parallel index cleanup respectively.
+	 */
+	int			nindexes_parallel_bulkdel;
+	int			nindexes_parallel_cleanup;
+	int			nindexes_parallel_condcleanup;
+
+	/* True if we need to reinitialize parallel DSM before launching workers */
+	bool		first_time;
+
+	/* Buffer access strategy used by leader process */
+	BufferAccessStrategy bstrategy;
+
+	/* Error reporting state */
+	char	   *relnamespace;
+	char	   *relname;
+	char	   *indname;
+	PVIndVacStatus status;
+} ParallelVacuumState;
+
+static int	parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
+											bool *will_parallel_vacuum);
+static void parallel_vacuum_all_indexes(ParallelVacuumState *pvs, bool bulkdel,
+										bool have_done_bulkdel);
+static void parallel_vacuum_safe_indexes(ParallelVacuumState *pvs);
+static void parallel_vacuum_unsafe_indexes(ParallelVacuumState *pvs);
+static void parallel_vacuum_one_index(ParallelVacuumState *pvs, Relation indrel,
+									  PVIndStats *indstats);
+static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, bool bulkdel,
+												   bool have_done_bulkdel);
+static void parallel_vacuum_error_callback(void *arg);
+
+/*
+ * Try to enter parallel mode and create a parallel context.  Then initialize
+ * shared memory state.
+ *
+ * On success, return parallel vacuum state.  Otherwise return NULL.
+ */
+ParallelVacuumState *
+parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
+					 int nrequested_workers, int max_items,
+					 int elevel, BufferAccessStrategy bstrategy)
+{
+	ParallelVacuumState *pvs;
+	ParallelContext *pcxt;
+	PVShared   *shared;
+	VacDeadItems *dead_items;
+	PVIndStats *indstats;
+	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
+	bool	   *will_parallel_vacuum;
+	Size		est_indstats_len;
+	Size		est_shared_len;
+	Size		est_dead_items_len;
+	int			nindexes_mwm = 0;
+	int			parallel_workers = 0;
+	int			querylen;
+
+	/*
+	 * A parallel vacuum must be requested and there must be indexes on the
+	 * relation
+	 */
+	Assert(nrequested_workers >= 0);
+	Assert(nindexes > 0);
+
+	/*
+	 * Compute the number of parallel vacuum workers to launch
+	 */
+	will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
+	parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
+													   nrequested_workers,
+													   will_parallel_vacuum);
+	if (parallel_workers <= 0)
+	{
+		/* Can't perform vacuum in parallel -- return NULL */
+		pfree(will_parallel_vacuum);
+		return NULL;
+	}
+
+	pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
+	pvs->indrels = indrels;
+	pvs->nindexes = nindexes;
+	pvs->will_parallel_vacuum = will_parallel_vacuum;
+	pvs->first_time = true;
+	pvs->bstrategy = bstrategy;
+
+	/*
+	 * Set error traceback information. Other field will be filled during
+	 * vacuuming indexes.
+	 */
+	pvs->relnamespace = get_namespace_name(RelationGetNamespace(rel));
+	pvs->relname = pstrdup(RelationGetRelationName(rel));
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+								 parallel_workers);
+	Assert(pcxt->nworkers > 0);
+	pvs->pcxt = pcxt;
+
+	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
+	est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+	est_shared_len = sizeof(PVShared);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
+	est_dead_items_len = vac_max_items_to_alloc_size(max_items);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Estimate space for BufferUsage and WalUsage --
+	 * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgBufferUsage or
+	 * pgWalUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	InitializeParallelDSM(pcxt);
+
+	/* Prepare index vacuum stats */
+	indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
+	for (int idx = 0; idx < nindexes; idx++)
+	{
+		Relation	indrel = indrels[idx];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		/*
+		 * Cleanup option should be either disabled, always performing in
+		 * parallel or conditionally performing in parallel.
+		 */
+		Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+			   ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+		Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+		if (!will_parallel_vacuum[idx])
+			continue;
+
+		if (indrel->rd_indam->amusemaintenanceworkmem)
+			nindexes_mwm++;
+
+		/*
+		 * Remember the number of indexes that support parallel operation for
+		 * each phase.
+		 */
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			pvs->nindexes_parallel_bulkdel++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+			pvs->nindexes_parallel_cleanup++;
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+			pvs->nindexes_parallel_condcleanup++;
+	}
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats);
+	pvs->indstats = indstats;
+
+	/* Prepare shared information */
+	shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
+	MemSet(shared, 0, est_shared_len);
+	shared->relid = RelationGetRelid(rel);
+	shared->elevel = elevel;
+	shared->maintenance_work_mem_worker =
+		(nindexes_mwm > 0) ?
+		maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+		maintenance_work_mem;
+
+	pg_atomic_init_u32(&(shared->cost_balance), 0);
+	pg_atomic_init_u32(&(shared->active_nworkers), 0);
+	pg_atomic_init_u32(&(shared->idx), 0);
+
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
+	pvs->shared = shared;
+
+	/* Prepare the dead_items space */
+	dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc,
+												   est_dead_items_len);
+	dead_items->max_items = max_items;
+	dead_items->num_items = 0;
+	MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items);
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items);
+	pvs->dead_items = dead_items;
+
+	/*
+	 * Allocate space for each worker's BufferUsage and WalUsage; no need to
+	 * initialize
+	 */
+	buffer_usage = shm_toc_allocate(pcxt->toc,
+									mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
+	pvs->buffer_usage = buffer_usage;
+	wal_usage = shm_toc_allocate(pcxt->toc,
+								 mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+	pvs->wal_usage = wal_usage;
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		sharedquery[querylen] = '\0';
+		shm_toc_insert(pcxt->toc,
+					   PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	/* Success -- return parallel vacuum state */
+	return pvs;
+}
+
+/*
+ * Destroy the parallel context, and end parallel mode.
+ *
+ * Since writes are not allowed during parallel mode, copy the
+ * updated index statistics from DSM into local memory and then later use that
+ * to update the index statistics.  One might think that we can exit from
+ * parallel mode, update the index statistics and then destroy parallel
+ * context, but that won't be safe (see ExitParallelMode).
+ */
+void
+parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
+{
+	Assert(!IsParallelWorker());
+
+	/* Copy the updated statistics */
+	for (int i = 0; i < pvs->nindexes; i++)
+	{
+		PVIndStats *indstats = &(pvs->indstats[i]);
+
+		if (indstats->istat_updated)
+		{
+			istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+			memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult));
+		}
+		else
+			istats[i] = NULL;
+	}
+
+	DestroyParallelContext(pvs->pcxt);
+	ExitParallelMode();
+
+	pfree(pvs->will_parallel_vacuum);
+	pfree(pvs->relnamespace);
+	pfree(pvs->relname);
+	pfree(pvs);
+}
+
+/* Returns the dead items space */
+VacDeadItems *
+parallel_vacuum_get_dead_items(ParallelVacuumState *pvs)
+{
+	return pvs->dead_items;
+}
+
+/*
+ * Do parallel index bulk-deletion with parallel workers.
+ */
+void
+parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples)
+{
+	Assert(!IsParallelWorker());
+
+	/*
+	 * We can only provide an approximate value of num_heap_tuples, at least
+	 * for now.
+	 */
+	pvs->shared->reltuples = num_table_tuples;
+	pvs->shared->estimated_count = true;
+
+	/* have_done_bulkdel is not used in parallel bulkdel cases */
+	parallel_vacuum_all_indexes(pvs, true, false);
+}
+
+/*
+ * Do parallel index cleanup with parallel workers.
+ *
+ * have_done_bulkdel is true if the caller has done index bulk-deletion one
+ * or more times in the vacuum execution.
+ */
+void
+parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples,
+									bool estimated_count, bool have_done_bulkdel)
+{
+	Assert(!IsParallelWorker());
+
+	/*
+	 * We can provide a better estimate of total number of surviving tuples
+	 * (we assume indexes are more interested in that than in the number of
+	 * nominally live tuples).
+	 */
+	pvs->shared->reltuples = num_table_tuples;
+	pvs->shared->estimated_count = estimated_count;
+
+	parallel_vacuum_all_indexes(pvs, false, have_done_bulkdel);
+}
+
+/*
+ * Compute the number of parallel worker processes to request.  Both index
+ * bulk-deletion and index cleanup can be executed with parallel workers.
+ * The index is eligible for parallel vacuum iff its size is greater than
+ * min_parallel_index_scan_size as invoking workers for very small indexes
+ * can hurt performance.
+ *
+ * nrequested is the number of parallel workers that user requested.  If
+ * nrequested is 0, we compute the parallel degree based on nindexes, that is
+ * the number of indexes that support parallel vacuum.  This function also
+ * sets will_parallel_vacuum to remember indexes that participate in parallel
+ * vacuum.
+ */
+static int
+parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
+								bool *will_parallel_vacuum)
+{
+	int			nindexes_parallel = 0;
+	int			nindexes_parallel_bulkdel = 0;
+	int			nindexes_parallel_cleanup = 0;
+	int			parallel_workers;
+
+	/*
+	 * We don't allow performing parallel operation in standalone backend or
+	 * when parallelism is disabled.
+	 */
+	if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+		return 0;
+
+	/*
+	 * Compute the number of indexes that can participate in parallel vacuum.
+	 */
+	for (int i = 0; i < nindexes; i++)
+	{
+		Relation	indrel = indrels[i];
+		uint8		vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+		/* Skip index that is not a suitable target for parallel index vacuum */
+		if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
+			RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
+			continue;
+
+		will_parallel_vacuum[i] = true;
+
+		if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+			nindexes_parallel_bulkdel++;
+		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
+			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+			nindexes_parallel_cleanup++;
+	}
+
+	nindexes_parallel = Max(nindexes_parallel_bulkdel,
+							nindexes_parallel_cleanup);
+
+	/* The leader process takes one index */
+	nindexes_parallel--;
+
+	/* No index supports parallel vacuum */
+	if (nindexes_parallel <= 0)
+		return 0;
+
+	/* Compute the parallel degree */
+	parallel_workers = (nrequested > 0) ?
+		Min(nrequested, nindexes_parallel) : nindexes_parallel;
+
+	/* Cap by max_parallel_maintenance_workers */
+	parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+
+	return parallel_workers;
+}
+
+/*
+ * Perform index bulk-deletion or index cleanup with parallel workers.  This
+ * function must be used by the parallel vacuum leader process.
+ */
+static void
+parallel_vacuum_all_indexes(ParallelVacuumState *pvs, bool bulkdel,
+							bool have_done_bulkdel)
+{
+	int			nworkers;
+	ErrorContextCallback errcallback;
+	PVIndVacStatus new_status;
+
+	Assert(!IsParallelWorker());
+
+	if (bulkdel)
+	{
+		new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
+
+		/* Determine the number of parallel workers to launch */
+		nworkers = pvs->nindexes_parallel_bulkdel;
+	}
+	else
+	{
+		new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
+
+		/* Determine the number of parallel workers to launch */
+		nworkers = pvs->nindexes_parallel_cleanup;
+
+		/* Add conditionally parallel-aware indexes if in the first time call */
+		if (!have_done_bulkdel)
+			nworkers += pvs->nindexes_parallel_condcleanup;
+	}
+
+	/* The leader process will participate */
+	nworkers--;
+
+	/*
+	 * It is possible that parallel context is initialized with fewer workers
+	 * than the number of indexes that need a separate worker in the current
+	 * phase, so we need to consider it.  See
+	 * parallel_vacuum_compute_workers().
+	 */
+	nworkers = Min(nworkers, pvs->pcxt->nworkers);
+
+	/*
+	 * Set index vacuum status and mark whether parallel vacuum worker can
+	 * process it.
+	 */
+	for (int i = 0; i < pvs->nindexes; i++)
+	{
+		PVIndStats *indstats = &(pvs->indstats[i]);
+
+		Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
+		indstats->status = new_status;
+		indstats->parallel_workers_can_process =
+			(pvs->will_parallel_vacuum[i] &
+			 parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], bulkdel,
+													have_done_bulkdel));
+	}
+
+	/* Reset the parallel index processing counter */
+	pg_atomic_write_u32(&(pvs->shared->idx), 0);
+
+	/* Setup the shared cost-based vacuum delay and launch workers */
+	if (nworkers > 0)
+	{
+		/* Reinitialize parallel context to relaunch parallel workers */
+		if (!pvs->first_time)
+			ReinitializeParallelDSM(pvs->pcxt);
+
+		/*
+		 * Set up shared cost balance and the number of active workers for
+		 * vacuum delay.  We need to do this before launching workers as
+		 * otherwise, they might not see the updated values for these
+		 * parameters.
+		 */
+		pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
+		pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
+
+		/*
+		 * The number of workers can vary between bulkdelete and cleanup
+		 * phase.
+		 */
+		ReinitializeParallelWorkers(pvs->pcxt, nworkers);
+
+		LaunchParallelWorkers(pvs->pcxt);
+
+		if (pvs->pcxt->nworkers_launched > 0)
+		{
+			/*
+			 * Reset the local cost values for leader backend as we have
+			 * already accumulated the remaining balance of heap.
+			 */
+			VacuumCostBalance = 0;
+			VacuumCostBalanceLocal = 0;
+
+			/* Enable shared cost balance for leader backend */
+			VacuumSharedCostBalance = &(pvs->shared->cost_balance);
+			VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
+		}
+
+		if (bulkdel)
+			ereport(pvs->shared->elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+									 "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+									 pvs->pcxt->nworkers_launched),
+							pvs->pcxt->nworkers_launched, nworkers)));
+		else
+			ereport(pvs->shared->elevel,
+					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+									 "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+									 pvs->pcxt->nworkers_launched),
+							pvs->pcxt->nworkers_launched, nworkers)));
+	}
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = parallel_vacuum_error_callback;
+	errcallback.arg = pvs;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* Vacuum the indexes that can be processed by only leader process */
+	parallel_vacuum_unsafe_indexes(pvs);
+
+	/*
+	 * Join as a parallel worker.  The leader vacuums alone processes all
+	 * parallel-safe indexes in the case where no workers are launched.
+	 */
+	parallel_vacuum_safe_indexes(pvs);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	/*
+	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
+	 * to finish, or we might get incomplete data.)
+	 */
+	if (nworkers > 0)
+	{
+		/* Wait for all vacuum workers to finish */
+		WaitForParallelWorkersToFinish(pvs->pcxt);
+
+		for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
+			InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
+	}
+
+	/*
+	 * Reset all index status back to initial (while checking that we have
+	 * vacuumed all indexes).
+	 */
+	for (int i = 0; i < pvs->nindexes; i++)
+	{
+		PVIndStats *indstats = &(pvs->indstats[i]);
+
+		if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
+			elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
+				 RelationGetRelationName(pvs->indrels[i]));
+
+		indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
+	}
+
+	/*
+	 * Carry the shared balance value to heap scan and disable shared costing
+	 */
+	if (VacuumSharedCostBalance)
+	{
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+		VacuumSharedCostBalance = NULL;
+		VacuumActiveNWorkers = NULL;
+	}
+}
+
+/*
+ * Index bulk-deletion/cleanup routine used by the leader process and parallel
+ * vacuum worker processes to vacuum the indexes in parallel.
+ */
+static void
+parallel_vacuum_safe_indexes(ParallelVacuumState *pvs)
+{
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	/* Loop until all indexes are vacuumed */
+	for (;;)
+	{
+		int			idx;
+		PVIndStats *indstats;
+
+		/* Get an index number to process */
+		idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1);
+
+		/* Done for all indexes? */
+		if (idx >= pvs->nindexes)
+			break;
+
+		indstats = &(pvs->indstats[idx]);
+
+		/*
+		 * Skip vacuuming index that is unsafe for workers or has an
+		 * unsuitable target for parallel index vacuum (this is vacuumed in
+		 * parallel_vacuum_unsafe_indexes() by the leader).
+		 */
+		if (!indstats->parallel_workers_can_process)
+			continue;
+
+		/* Do vacuum or cleanup of the index */
+		parallel_vacuum_one_index(pvs, pvs->indrels[idx], indstats);
+	}
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Perform parallel vacuuming of indexes in leader process.
+ *
+ * Handles index bulk-deletion (or index cleanup) for indexes that are not
+ * parallel safe.  It's possible that this will vary for a given index, based
+ * on details like whether we're performing index cleanup right now.
+ *
+ * Also performs vacuuming of smaller indexes that fell under the size cutoff
+ * enforced by parallel_vacuum_compute_workers().
+ */
+static void
+parallel_vacuum_unsafe_indexes(ParallelVacuumState *pvs)
+{
+	Assert(!IsParallelWorker());
+
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+	for (int i = 0; i < pvs->nindexes; i++)
+	{
+		PVIndStats *indstats = &(pvs->indstats[i]);
+
+		/* Skip, indexes that are safe for workers */
+		if (indstats->parallel_workers_can_process)
+			continue;
+
+		/* Do vacuum or cleanup of the index */
+		parallel_vacuum_one_index(pvs, pvs->indrels[i], indstats);
+	}
+
+	/*
+	 * We have completed the index vacuum so decrement the active worker
+	 * count.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * Do bulk-deletion or cleanup index either by leader process or by one of the
+ * worker process.  After vacuuming the index this function copies the index
+ * statistics returned from ambulkdelete and amvacuumcleanup to the DSM
+ * segment.
+ */
+static void
+parallel_vacuum_one_index(ParallelVacuumState *pvs, Relation indrel, PVIndStats *indstats)
+{
+	IndexBulkDeleteResult *istat = NULL;
+	IndexBulkDeleteResult *istat_res;
+	IndexVacuumInfo ivinfo;
+
+	/*
+	 * Update the pointer to the corresponding bulk-deletion result if someone
+	 * has already updated it
+	 */
+	if (indstats->istat_updated)
+		istat = &(indstats->istat);
+
+	ivinfo.index = indrel;
+	ivinfo.analyze_only = false;
+	ivinfo.report_progress = false;
+	ivinfo.message_level = pvs->shared->elevel;
+	ivinfo.estimated_count = pvs->shared->estimated_count;
+	ivinfo.num_heap_tuples = pvs->shared->reltuples;
+	ivinfo.strategy = pvs->bstrategy;
+
+	/* Update error traceback information */
+	pvs->indname = pstrdup(RelationGetRelationName(indrel));
+	pvs->status = indstats->status;
+
+	switch (indstats->status)
+	{
+		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+			istat_res = bulkdel_one_index(&ivinfo, istat, pvs->dead_items);
+			break;
+		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+			istat_res = cleanup_one_index(&ivinfo, istat);
+			break;
+		default:
+			elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
+				 indstats->status,
+				 RelationGetRelationName(indrel));
+	}
+
+	/*
+	 * Copy the index bulk-deletion result returned from ambulkdelete and
+	 * amvacuumcleanup to the DSM segment if it's the first cycle because they
+	 * allocate locally and it's possible that an index will be vacuumed by a
+	 * different vacuum process the next cycle.  Copying the result normally
+	 * happens only the first time an index is vacuumed.  For any additional
+	 * vacuum pass, we directly point to the result on the DSM segment and
+	 * pass it to vacuum index APIs so that workers can update it directly.
+	 *
+	 * Since all vacuum workers write the bulk-deletion result at different
+	 * slots we can write them without locking.
+	 */
+	if (!indstats->istat_updated && istat_res != NULL)
+	{
+		memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+		indstats->istat_updated = true;
+
+		/* Free the locally-allocated bulk-deletion result */
+		pfree(istat_res);
+	}
+
+	/*
+	 * Update the status to completed. No need to lock here since each worker
+	 * touches different indexes.
+	 */
+	indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
+
+	/* Reset error traceback information */
+	pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
+	pfree(pvs->indname);
+	pvs->indname = NULL;
+}
+
+/*
+ * Returns false, if the given index can't participate in the next execution of
+ * parallel index vacuum.
+ */
+static bool
+parallel_vacuum_index_is_parallel_safe(Relation indrel, bool bulkdel,
+									   bool have_done_bulkdel)
+{
+	uint8		vacoptions;
+
+	vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+	/* In parallel vacuum case, check if it supports parallel bulk-deletion */
+	if (bulkdel)
+		return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
+
+	/* Not safe, if the index does not support parallel cleanup */
+	if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+		return false;
+
+	/*
+	 * Not safe, if the index supports parallel cleanup conditionally, but we
+	 * have already processed the index (for bulkdelete).  We do this to avoid
+	 * the need to invoke workers when parallel index cleanup doesn't need to
+	 * scan the index.  See the comments for option
+	 * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
+	 * parallel cleanup conditionally.
+	 */
+	if (have_done_bulkdel &&
+		((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
+		return false;
+
+	return true;
+}
+
+/*
+ * Perform work within a launched parallel process.
+ *
+ * Since parallel vacuum workers perform only index bulk-deletion or index cleanup,
+ * we don't need to report progress information.
+ */
+void
+parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
+{
+	ParallelVacuumState pvs;
+	Relation	rel;
+	Relation   *indrels;
+	PVIndStats *indstats;
+	PVShared   *shared;
+	VacDeadItems *dead_items;
+	BufferUsage *buffer_usage;
+	WalUsage   *wal_usage;
+	int			nindexes;
+	char	   *sharedquery;
+	ErrorContextCallback errcallback;
+
+	/*
+	 * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
+	 * don't support parallel vacuum for autovacuum as of now.
+	 */
+	Assert(MyProc->statusFlags == PROC_IN_VACUUM);
+
+	elog(DEBUG1, "starting parallel vacuum worker");
+
+	shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
+
+	/* Set debug_query_string for individual workers */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/*
+	 * Open table.  The lock mode is the same as the leader process.  It's
+	 * okay because the lock mode does not conflict among the parallel
+	 * workers.
+	 */
+	rel = table_open(shared->relid, ShareUpdateExclusiveLock);
+
+	/*
+	 * Open all indexes. indrels are sorted in order by OID, which should be
+	 * matched to the leader's one.
+	 */
+	vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
+	Assert(nindexes > 0);
+
+	if (shared->maintenance_work_mem_worker > 0)
+		maintenance_work_mem = shared->maintenance_work_mem_worker;
+
+	/* Set index statistics */
+	indstats = (PVIndStats *) shm_toc_lookup(toc,
+											 PARALLEL_VACUUM_KEY_INDEX_STATS,
+											 false);
+
+	/* Set dead_items space */
+	dead_items = (VacDeadItems *) shm_toc_lookup(toc,
+												 PARALLEL_VACUUM_KEY_DEAD_ITEMS,
+												 false);
+
+	/* Set cost-based vacuum delay */
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+	VacuumCostBalanceLocal = 0;
+	VacuumSharedCostBalance = &(shared->cost_balance);
+	VacuumActiveNWorkers = &(shared->active_nworkers);
+
+	/* Set parallel vacuum state */
+	pvs.indrels = indrels;
+	pvs.nindexes = nindexes;
+	pvs.indstats = indstats;
+	pvs.shared = shared;
+	pvs.dead_items = dead_items;
+	pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel));
+	pvs.relname = pstrdup(RelationGetRelationName(rel));
+
+	/* These fields will be filled during index vacuum or cleanup */
+	pvs.indname = NULL;
+	pvs.status = PARALLEL_INDVAC_STATUS_INITIAL;
+
+	/* Each parallel VACUUM worker gets its own access strategy */
+	pvs.bstrategy = GetAccessStrategy(BAS_VACUUM);
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = parallel_vacuum_error_callback;
+	errcallback.arg = &pvs;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/* Process indexes to perform vacuum/cleanup */
+	parallel_vacuum_safe_indexes(&pvs);
+
+	/* Report buffer/WAL usage during parallel execution */
+	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
+	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+						  &wal_usage[ParallelWorkerNumber]);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	vac_close_indexes(nindexes, indrels, RowExclusiveLock);
+	table_close(rel, ShareUpdateExclusiveLock);
+	FreeAccessStrategy(pvs.bstrategy);
+}
+
+/*
+ * Error context callback for errors occurring during parallel index vacuum.
+ */
+static void
+parallel_vacuum_error_callback(void *arg)
+{
+	ParallelVacuumState *errinfo = arg;
+
+	switch (errinfo->status)
+	{
+		case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+			errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"",
+					   errinfo->indname,
+					   errinfo->relnamespace,
+					   errinfo->relname);
+			break;
+		case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+			errcontext("while cleanup index \"%s\" of relation \"%s.%s\"",
+					   errinfo->indname,
+					   errinfo->relnamespace,
+					   errinfo->relname);
+			break;
+		case PARALLEL_INDVAC_STATUS_INITIAL:
+		case PARALLEL_INDVAC_STATUS_COMPLETED:
+		default:
+			break;
+	}
+}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 417dd288e5..f3fb1e93a5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -198,7 +198,6 @@ extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 struct VacuumParams;
 extern void heap_vacuum_rel(Relation rel,
 							struct VacuumParams *params, BufferAccessStrategy bstrategy);
-extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in heap/heapam_visibility.c */
 extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 4cfd52eaf4..7ed58af9d8 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -15,6 +15,8 @@
 #define VACUUM_H
 
 #include "access/htup.h"
+#include "access/genam.h"
+#include "access/parallel.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
@@ -62,6 +64,9 @@
 /* value for checking vacuum flags */
 #define VACUUM_OPTION_MAX_VALID_VALUE		((1 << 3) - 1)
 
+/* Abstract type for parallel vacuum state */
+typedef struct ParallelVacuumState ParallelVacuumState;
+
 /*----------
  * ANALYZE builds one of these structs for each attribute (column) that is
  * to be analyzed.  The struct and subsidiary data are in anl_context,
@@ -230,6 +235,21 @@ typedef struct VacuumParams
 	int			nworkers;
 } VacuumParams;
 
+/*
+ * VacDeadItems stores TIDs whose index tuples are deleted by index vacuuming.
+ */
+typedef struct VacDeadItems
+{
+	int			max_items;		/* # slots allocated in array */
+	int			num_items;		/* current # of entries */
+
+	/* Sorted array of TIDs to delete from indexes */
+	ItemPointerData items[FLEXIBLE_ARRAY_MEMBER];
+} VacDeadItems;
+
+#define MAXDEADITEMS(avail_mem) \
+	(((avail_mem) - offsetof(VacDeadItems, items)) / sizeof(ItemPointerData))
+
 /* GUC parameters */
 extern PGDLLIMPORT int default_statistics_target;	/* PGDLLIMPORT for PostGIS */
 extern int	vacuum_freeze_min_age;
@@ -282,6 +302,27 @@ extern bool vacuum_is_relation_owner(Oid relid, Form_pg_class reltuple,
 extern Relation vacuum_open_relation(Oid relid, RangeVar *relation,
 									 bits32 options, bool verbose,
 									 LOCKMODE lmode);
+extern IndexBulkDeleteResult *bulkdel_one_index(IndexVacuumInfo *ivinfo,
+												IndexBulkDeleteResult *istat,
+												VacDeadItems *dead_items);
+extern IndexBulkDeleteResult *cleanup_one_index(IndexVacuumInfo *ivinfo,
+												IndexBulkDeleteResult *istat);
+extern Size vac_max_items_to_alloc_size(int max_items);
+
+/* in commands/vacuumparallel.c */
+extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels,
+												 int nindexes, int nrequested_workers,
+												 int max_items, int elevel,
+												 BufferAccessStrategy bstrategy);
+extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats);
+extern VacDeadItems *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs);
+extern void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs,
+												long num_table_tuples);
+extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs,
+												long num_table_tuples,
+												bool estimated_count,
+												bool no_bulkdel_call);
+extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
 extern void analyze_rel(Oid relid, RangeVar *relation,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0c61ccbdd0..469c7c2dd7 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1810,6 +1810,7 @@ ParallelSlotResultHandler
 ParallelState
 ParallelTableScanDesc
 ParallelTableScanDescData
+ParallelVacuumState
 ParallelWorkerContext
 ParallelWorkerInfo
 Param
@@ -2800,6 +2801,7 @@ UserMapping
 UserOpts
 VacAttrStats
 VacAttrStatsP
+VacDeadItems
 VacErrPhase
 VacOptValue
 VacuumParams
-- 
2.24.3 (Apple Git-128)

