On Fri, May 2, 2025 at 11:58 PM Sami Imseih <samims...@gmail.com> wrote: > > I am generally -1 on the idea of autovacuum performing parallel > index vacuum, because I always felt that the parallel option should > be employed in a targeted manner for a specific table. if you have a bunch > of large tables, some more important than others, a/c may end > up using parallel resources on the least important tables and you > will have to adjust a/v settings per table, etc to get the right table > to be parallel index vacuumed by a/v.
Hm, this is a good point. I think I should clarify one moment - in practice, there is a common situation when users have one huge table among all databases (with 80+ indexes created on it). But, of course, in general there may be few such tables. But we can still adjust the autovac_idx_parallel_min_rows parameter. If a table has a lot of dead tuples => it is actively used => table is important (?). Also, if the user can really determine the "importance" of each of the tables - we can provide an appropriate table option. Tables with this option set will be processed in parallel in priority order. What do you think about such an idea? > > Also, with the TIDStore improvements for index cleanup, and the practical > elimination of multi-pass index vacuums, I see this being even less > convincing as something to add to a/v. If I understood correctly, then we are talking about the fact that TIDStore can store so many tuples that in fact a second pass is never needed. But the number of passes does not affect the presented optimization in any way. We must think about a large number of indexes that must be processed. Even within a single pass we can have a 40% increase in speed. > > Now, If I am going to allocate extra workers to run vacuum in parallel, why > not just provide more autovacuum workers instead so I can get more tables > vacuumed within a span of time? For now, only one process can clean up indexes, so I don't see how increasing the number of a/v workers will help in the situation that I mentioned above. Also, we don't consume additional resources during autovacuum in this patch - total number of a/v workers always <= autovacuum_max_workers. BTW, see v2 patch, attached to this letter (bug fixes) :-) -- Best regards, Daniil Davydov
From 1c93a729b844a1dfe109e8d9e54d5cc0a941d061 Mon Sep 17 00:00:00 2001 From: Daniil Davidov <d.davy...@postgrespro.ru> Date: Sat, 3 May 2025 00:27:45 +0700 Subject: [PATCH v2] WIP Allow autovacuum to process indexes of single table in parallel --- src/backend/commands/vacuum.c | 27 + src/backend/commands/vacuumparallel.c | 289 +++++- src/backend/postmaster/autovacuum.c | 906 +++++++++++++++++- src/backend/utils/misc/guc_tables.c | 30 + src/backend/utils/misc/postgresql.conf.sample | 6 + src/include/postmaster/autovacuum.h | 23 + src/test/modules/autovacuum/.gitignore | 1 + src/test/modules/autovacuum/Makefile | 14 + .../autovacuum/t/001_autovac_parallel.pl | 137 +++ 9 files changed, 1387 insertions(+), 46 deletions(-) create mode 100644 src/test/modules/autovacuum/.gitignore create mode 100644 src/test/modules/autovacuum/Makefile create mode 100644 src/test/modules/autovacuum/t/001_autovac_parallel.pl diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 33a33bf6b1c..a5ef5319ccc 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -2234,6 +2234,33 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, else toast_relid = InvalidOid; + /* + * Decide whether we need to process table with given oid in parallel mode + * during autovacuum. + */ + if (AmAutoVacuumWorkerProcess() && + params->index_cleanup != VACOPTVALUE_DISABLED) + { + PgStat_StatTabEntry *tabentry; + + /* fetch the pgstat table entry */ + tabentry = pgstat_fetch_stat_tabentry_ext(rel->rd_rel->relisshared, + rel->rd_id); + if (tabentry && tabentry->dead_tuples >= autovac_idx_parallel_min_rows) + { + List *indexes = RelationGetIndexList(rel); + int num_indexes = list_length(indexes); + + list_free(indexes); + + if (num_indexes >= autovac_idx_parallel_min_indexes && + max_parallel_index_autovac_workers > 0) + { + params->nworkers = max_parallel_index_autovac_workers; + } + } + } + /* * Switch to the table owner's userid, so that any index functions are run * as that user. Also lock down security-restricted operations and diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 2b9d548cdeb..cb4b7c23010 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -1,20 +1,23 @@ /*------------------------------------------------------------------------- * * vacuumparallel.c - * Support routines for parallel vacuum execution. + * Support routines for parallel [auto]vacuum execution. * * This file contains routines that are intended to support setting up, using, * and tearing down a ParallelVacuumState. * - * In a parallel vacuum, we perform both index bulk deletion and index cleanup - * with parallel worker processes. Individual indexes are processed by one - * vacuum process. ParallelVacuumState contains shared information as well as - * the memory space for storing dead items allocated in the DSA area. We + * In a parallel [auto]vacuum, we perform both index bulk deletion and index + * cleanup with parallel worker processes. Individual indexes are processed by + * one vacuum process. ParallelVacuumState contains shared information as well + * as the memory space for storing dead items allocated in the DSA area. We * launch parallel worker processes at the start of parallel index * bulk-deletion and index cleanup and once all indexes are processed, the * parallel worker processes exit. Each time we process indexes in parallel, * the parallel context is re-initialized so that the same DSM can be used for - * multiple passes of index bulk-deletion and index cleanup. + * multiple passes of index bulk-deletion and index cleanup. For maintenance + * vacuum, we launch workers manually (using dynamic bgworkers machinery), and + * for autovacuum we send signals to the autovacuum launcher (all logic for + * communication among parallel autovacuum processes is in autovacuum.c). * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -34,9 +37,11 @@ #include "executor/instrument.h" #include "optimizer/paths.h" #include "pgstat.h" +#include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" /* @@ -157,11 +162,20 @@ typedef struct PVIndStats } PVIndStats; /* - * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h. + * Struct for maintaining a parallel [auto]vacuum state. typedef appears in + * vacuum.h. */ struct ParallelVacuumState { - /* NULL for worker processes */ + /* Is this structure used for maintenance vacuum or autovacuum */ + bool is_autovacuum; + + /* + * NULL for worker processes. + * + * NOTE: Parallel autovacuum only needs a subset of the maintenance vacuum + * functionality. + */ ParallelContext *pcxt; /* Parent Heap Relation */ @@ -221,6 +235,10 @@ struct ParallelVacuumState PVIndVacStatus status; }; +static ParallelContext *CreateParallelAutoVacContext(int nworkers); +static void InitializeParallelAutoVacDSM(ParallelContext *pcxt); +static void DestroyParallelAutoVacContext(ParallelContext *pcxt); + static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, bool *will_parallel_vacuum); static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, @@ -280,15 +298,21 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, } pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); + pvs->is_autovacuum = AmAutoVacuumWorkerProcess(); pvs->indrels = indrels; pvs->nindexes = nindexes; pvs->will_parallel_vacuum = will_parallel_vacuum; pvs->bstrategy = bstrategy; pvs->heaprel = rel; - EnterParallelMode(); - pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", - parallel_workers); + if (pvs->is_autovacuum) + pcxt = CreateParallelAutoVacContext(parallel_workers); + else + { + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", + parallel_workers); + } Assert(pcxt->nworkers > 0); pvs->pcxt = pcxt; @@ -327,7 +351,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, else querylen = 0; /* keep compiler quiet */ - InitializeParallelDSM(pcxt); + if (pvs->is_autovacuum) + InitializeParallelAutoVacDSM(pvs->pcxt); + else + InitializeParallelDSM(pcxt); /* Prepare index vacuum stats */ indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len); @@ -371,11 +398,18 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shared->relid = RelationGetRelid(rel); shared->elevel = elevel; shared->queryid = pgstat_get_my_query_id(); - shared->maintenance_work_mem_worker = - (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; - shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024; + + if (pvs->is_autovacuum) + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + autovacuum_work_mem / Min(parallel_workers, nindexes_mwm) : + autovacuum_work_mem; + else + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : + maintenance_work_mem; + shared->dead_items_info.max_bytes = vac_work_mem * 1024L; /* Prepare DSA space for dead items */ dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes, @@ -453,8 +487,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) TidStoreDestroy(pvs->dead_items); - DestroyParallelContext(pvs->pcxt); - ExitParallelMode(); + if (pvs->is_autovacuum) + DestroyParallelAutoVacContext(pvs->pcxt); + else + { + DestroyParallelContext((ParallelContext *) pvs->pcxt); + ExitParallelMode(); + } pfree(pvs->will_parallel_vacuum); pfree(pvs); @@ -532,6 +571,144 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup parallel_vacuum_process_all_indexes(pvs, num_index_scans, false); } +/* + * Short version of CreateParallelContext (parallel.c). Here we init only those + * fields that are needed for parallel index processing during autovacuum. + */ +static ParallelContext * +CreateParallelAutoVacContext(int nworkers) +{ + ParallelContext *pcxt; + MemoryContext oldcontext; + + Assert(AmAutoVacuumWorkerProcess()); + + /* Number of workers should be non-negative. */ + Assert(nworkers >= 0); + + /* We might be running in a short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize a new ParallelContext. */ + pcxt = palloc0(sizeof(ParallelContext)); + pcxt->nworkers = nworkers; + pcxt->nworkers_to_launch = nworkers; + shm_toc_initialize_estimator(&pcxt->estimator); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Short version of InitializeParallelDSM (parallel.c). Here we put into dsm + * only those data that are needed for parallel index processing during + * autovacuum. + */ +static void +InitializeParallelAutoVacDSM(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + Size tsnaplen = 0; + Size asnaplen = 0; + Size segsize = 0; + char *tsnapspace; + char *asnapspace; + Snapshot transaction_snapshot = GetTransactionSnapshot(); + Snapshot active_snapshot = GetActiveSnapshot(); + + Assert(pcxt->nworkers >= 1); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + if (IsolationUsesXactSnapshot()) + { + tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + asnaplen = EstimateSnapshotSpace(active_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + + /* Create DSM and initialize with new table of contents. */ + segsize = shm_toc_estimate(&pcxt->estimator); + pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + + if (pcxt->seg == NULL) + { + pcxt->nworkers = 0; + pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize); + } + + pcxt->toc = shm_toc_create(AV_PARALLEL_MAGIC, + pcxt->seg == NULL ? pcxt->private_memory : + dsm_segment_address(pcxt->seg), + segsize); + + /* We can skip the rest of this if we're not budgeting for any workers. */ + if (pcxt->nworkers > 0) + { + /* + * Serialize the transaction snapshot if the transaction isolation + * level uses a transaction snapshot. + */ + if (IsolationUsesXactSnapshot()) + { + tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); + SerializeSnapshot(transaction_snapshot, tsnapspace); + shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT, + tsnapspace); + } + + /* Serialize the active snapshot. */ + asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); + SerializeSnapshot(active_snapshot, asnapspace); + shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + } + + /* Update nworkers_to_launch, in case we changed nworkers above. */ + pcxt->nworkers_to_launch = pcxt->nworkers; + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Short version of DestroyParallelContext (parallel.c). Here we clean up only + * those data that were used during parallel index processing during autovacuum. + */ +static void +DestroyParallelAutoVacContext(ParallelContext *pcxt) +{ + /* + * If we have allocated a shared memory segment, detach it. This will + * implicitly detach the error queues, and any other shared memory queues, + * stored there. + */ + if (pcxt->seg != NULL) + { + dsm_detach(pcxt->seg); + pcxt->seg = NULL; + } + + /* + * If this parallel context is actually in backend-private memory rather + * than shared memory, free that memory instead. + */ + if (pcxt->private_memory != NULL) + { + pfree(pcxt->private_memory); + pcxt->private_memory = NULL; + } + + AutoVacuumReleaseParallelWork(false); + pfree(pcxt); +} + /* * Compute the number of parallel worker processes to request. Both index * vacuum and index cleanup can be executed with parallel workers. @@ -558,7 +735,9 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, * We don't allow performing parallel operation in standalone backend or * when parallelism is disabled. */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + if (!IsUnderPostmaster || + (max_parallel_maintenance_workers == 0 && !AmAutoVacuumWorkerProcess()) || + (max_parallel_index_autovac_workers == 0 && AmAutoVacuumWorkerProcess())) return 0; /* @@ -597,15 +776,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, parallel_workers = (nrequested > 0) ? Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by GUC variable */ + parallel_workers = AmAutoVacuumWorkerProcess() ? + Min(parallel_workers, max_parallel_index_autovac_workers) : + Min(parallel_workers, max_parallel_maintenance_workers); return parallel_workers; } /* * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. + * must be used by the parallel [auto]vacuum leader process. */ static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, @@ -670,7 +851,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan if (nworkers > 0) { /* Reinitialize parallel context to relaunch parallel workers */ - if (num_index_scans > 0) + if (num_index_scans > 0 && !pvs->is_autovacuum) ReinitializeParallelDSM(pvs->pcxt); /* @@ -686,9 +867,22 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan * The number of workers can vary between bulkdelete and cleanup * phase. */ - ReinitializeParallelWorkers(pvs->pcxt, nworkers); - - LaunchParallelWorkers(pvs->pcxt); + if (pvs->is_autovacuum) + { + pvs->pcxt->nworkers_to_launch = Min(pvs->pcxt->nworkers, nworkers); + if (pvs->pcxt->nworkers > 0 && pvs->pcxt->nworkers_to_launch > 0) + { + pvs->pcxt->nworkers_launched = + LaunchParallelAutovacuumWorkers(pvs->heaprel->rd_id, + pvs->pcxt->nworkers_to_launch, + dsm_segment_handle(pvs->pcxt->seg)); + } + } + else + { + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + LaunchParallelWorkers(pvs->pcxt); + } if (pvs->pcxt->nworkers_launched > 0) { @@ -733,8 +927,14 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan */ if (nworkers > 0) { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(pvs->pcxt); + /* + * Wait for all [auto]vacuum workers involved in parallel index + * processing (if any) to finish and advance state machine. + */ + if (pvs->is_autovacuum && pvs->pcxt->nworkers_launched >= 0) + ParallelAutovacuumEndSyncPoint(false); + else if (!pvs->is_autovacuum) + WaitForParallelWorkersToFinish(pvs->pcxt); for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); @@ -982,8 +1182,8 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, /* * Perform work within a launched parallel process. * - * Since parallel vacuum workers perform only index vacuum or index cleanup, - * we don't need to report progress information. + * Since parallel [auto]vacuum workers perform only index vacuum or index + * cleanup, we don't need to report progress information. */ void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) @@ -997,23 +1197,22 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) BufferUsage *buffer_usage; WalUsage *wal_usage; int nindexes; + int worker_number; char *sharedquery; ErrorContextCallback errcallback; - /* - * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we - * don't support parallel vacuum for autovacuum as of now. - */ - Assert(MyProc->statusFlags == PROC_IN_VACUUM); - - elog(DEBUG1, "starting parallel vacuum worker"); + Assert(MyProc->statusFlags == PROC_IN_VACUUM || AmAutoVacuumWorkerProcess()); + elog(DEBUG1, "starting parallel [auto]vacuum worker"); shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); + if (!AmAutoVacuumWorkerProcess()) + { + sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + } /* Track query ID */ pgstat_report_query_id(shared->queryid, false); @@ -1091,8 +1290,12 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + + worker_number = AmAutoVacuumWorkerProcess() ? + GetAutoVacuumParallelWorkerNumber() : ParallelWorkerNumber; + + InstrEndParallelQuery(&buffer_usage[worker_number], + &wal_usage[worker_number]); /* Report any remaining cost-based vacuum delay time */ if (track_cost_delay_timing) diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 16756152b71..cb9c9f374bb 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -90,6 +90,7 @@ #include "postmaster/postmaster.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lmgr.h" @@ -102,6 +103,7 @@ #include "utils/fmgrprotos.h" #include "utils/guc_hooks.h" #include "utils/injection_point.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -129,6 +131,9 @@ int autovacuum_anl_thresh; double autovacuum_anl_scale; int autovacuum_freeze_max_age; int autovacuum_multixact_freeze_max_age; +int max_parallel_index_autovac_workers; +int autovac_idx_parallel_min_rows; +int autovac_idx_parallel_min_indexes; double autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; @@ -164,6 +169,14 @@ static int default_freeze_table_age; static int default_multixact_freeze_min_age; static int default_multixact_freeze_table_age; +/* + * Number of additional workers that was requested for parallel index processing + * during autovacuum. + */ +static int nworkers_for_idx_autovac = 0; + +static int nworkers_launched = 0; + /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; @@ -222,6 +235,8 @@ typedef struct autovac_table * wi_proc pointer to PGPROC of the running worker, NULL if not started * wi_launchtime Time at which this worker was launched * wi_dobalance Whether this worker should be included in balance calculations + * wi_pcleanup if (> 0) => this worker must participate in parallel index + * vacuuming as supportive . Must be (== 0) for leader worker. * * All fields are protected by AutovacuumLock, except for wi_tableoid and * wi_sharedrel which are protected by AutovacuumScheduleLock (note these @@ -237,10 +252,17 @@ typedef struct WorkerInfoData TimestampTz wi_launchtime; pg_atomic_flag wi_dobalance; bool wi_sharedrel; + int wi_pcleanup; } WorkerInfoData; typedef struct WorkerInfoData *WorkerInfo; +#define AmParallelIdxAutoVacSupportive() \ + (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup > 0) + +#define AmParallelIdxAutoVacLeader() \ + (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup == 0) + /* * Possible signals received by the launcher from remote processes. These are * stored atomically in shared memory so that other processes can set them @@ -250,9 +272,11 @@ typedef enum { AutoVacForkFailed, /* failed trying to start a worker */ AutoVacRebalance, /* rebalance the cost limits */ + AutoVacParallelReq, /* request for parallel index vacuum */ + AutoVacNumSignals, /* must be last */ } AutoVacuumSignal; -#define AutoVacNumSignals (AutoVacRebalance + 1) +#define AutoVacNumSignals (AutoVacParallelReq + 1) /* * Autovacuum workitem array, stored in AutoVacuumShmem->av_workItems. This @@ -272,6 +296,50 @@ typedef struct AutoVacuumWorkItem #define NUM_WORKITEMS 256 +typedef enum +{ + LAUNCHER = 0, /* autovacuum launcher must wake everyone up */ + LEADER, /* leader must wake everyone up */ + LAST_WORKER, /* the last inited supportive worker must wake everyone + up */ +} SyncType; + +typedef enum +{ + STARTUP = 0, /* initial value - no sync points were passed */ + START_SYNC_POINT_PASSED, /* start_sync_point was passed */ + END_SYNC_POINT_PASSED, /* end_sync_point was passed */ + SHUTDOWN, /* leader wants to shut down parallel index + vacuum due to occured error */ +} Status; + +/* + * Structure, stored in AutoVacuumShmem->pav_workItem. This is used for managing + * parallel index processing (whithin single table). + */ +typedef struct ParallelAutoVacuumWorkItem +{ + Oid avw_database; + Oid avw_relation; + int nworkers_participating; + int nworkers_to_launch; + int nworkers_sleeping; /* leader doesn't count */ + int nfinished; /* # of workers, that already finished parallel + index processing (and probably already dead) */ + + dsm_handle handl; + int leader_proc_pid; + + PGPROC *leader_proc; + ConditionVariable cv; + + bool active; /* being processed */ + bool leader_sleeping_on_ssp; /* sleeping on start sync point */ + bool leader_sleeping_on_esp; /* sleeping on end sync point */ + SyncType sync_type; + Status status; +} ParallelAutoVacuumWorkItem; + /*------------- * The main autovacuum shmem struct. On shared memory we store this main * struct and the array of WorkerInfo structs. This struct keeps: @@ -283,6 +351,8 @@ typedef struct AutoVacuumWorkItem * av_startingWorker pointer to WorkerInfo currently being started (cleared by * the worker itself as soon as it's up and running) * av_workItems work item array + * pav_workItem information needed for parallel index processing whithing + * single table * av_nworkersForBalance the number of autovacuum workers to use when * calculating the per worker cost limit * @@ -298,6 +368,7 @@ typedef struct dlist_head av_runningWorkers; WorkerInfo av_startingWorker; AutoVacuumWorkItem av_workItems[NUM_WORKITEMS]; + ParallelAutoVacuumWorkItem pav_workItem; pg_atomic_uint32 av_nworkersForBalance; } AutoVacuumShmemStruct; @@ -322,11 +393,17 @@ pg_noreturn static void AutoVacLauncherShutdown(void); static void launcher_determine_sleep(bool canlaunch, bool recursing, struct timeval *nap); static void launch_worker(TimestampTz now); +static void launch_worker_for_pcleanup(TimestampTz now); +static void eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item, + bool all_launched); static List *get_database_list(void); static void rebuild_database_list(Oid newdb); static int db_comparator(const void *a, const void *b); static void autovac_recalculate_workers_for_balance(void); +static int parallel_autovacuum_start_sync_point(bool keep_lock); +static void handle_parallel_idx_autovac_errors(void); + static void do_autovacuum(void); static void FreeWorkerInfo(int code, Datum arg); @@ -355,6 +432,10 @@ static void avl_sigusr2_handler(SIGNAL_ARGS); static bool av_worker_available(void); static void check_av_worker_gucs(void); +typedef bool (*wakeup_condition) (ParallelAutoVacuumWorkItem *item); +static bool start_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item); +static bool end_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item); +static void CVSleep(ParallelAutoVacuumWorkItem *item, wakeup_condition wakeup_cond); /******************************************************************** @@ -583,7 +664,14 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len) * wakening conditions. */ - launcher_determine_sleep(av_worker_available(), false, &nap); + if (nworkers_launched < nworkers_for_idx_autovac) + { + /* Take the smallest possible sleep interval. */ + nap.tv_sec = 0; + nap.tv_usec = MIN_AUTOVAC_SLEEPTIME * 1000; + } + else + launcher_determine_sleep(av_worker_available(), false, &nap); /* * Wait until naptime expires or we get some type of signal (all the @@ -614,6 +702,19 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len) LWLockRelease(AutovacuumLock); } + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + if (AutoVacuumShmem->av_signal[AutoVacParallelReq]) + { + ParallelAutoVacuumWorkItem *item; + + AutoVacuumShmem->av_signal[AutoVacParallelReq] = false; + + item = &AutoVacuumShmem->pav_workItem; + nworkers_for_idx_autovac = item->nworkers_to_launch; + nworkers_launched = 0; + } + LWLockRelease(AutovacuumLock); + if (AutoVacuumShmem->av_signal[AutoVacForkFailed]) { /* @@ -686,6 +787,7 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len) worker->wi_sharedrel = false; worker->wi_proc = NULL; worker->wi_launchtime = 0; + worker->wi_pcleanup = -1; dclist_push_head(&AutoVacuumShmem->av_freeWorkers, &worker->wi_links); AutoVacuumShmem->av_startingWorker = NULL; @@ -698,9 +800,29 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len) } LWLockRelease(AutovacuumLock); /* either shared or exclusive */ - /* if we can't do anything, just go back to sleep */ if (!can_launch) + { + /* + * If launcher cannot launch all requested for parallel index + * vacuum workers, it must handle all possible lock conflicts and + * tell everyone, that there will no new supportive workers. + */ + if (nworkers_launched < nworkers_for_idx_autovac) + { + ParallelAutoVacuumWorkItem *item; + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + item = &AutoVacuumShmem->pav_workItem; + Assert(item->active); + + eliminate_lock_conflicts(item, false); + nworkers_launched = nworkers_for_idx_autovac = 0; + LWLockRelease(AutovacuumLock); + } + + /* if we can't do anything else, just go back to sleep */ continue; + } /* We're OK to start a new worker */ @@ -716,6 +838,15 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len) */ launch_worker(current_time); } + else if (nworkers_launched < nworkers_for_idx_autovac) + { + /* + * One of active autovacuum workers sent us request to lauch + * participants for parallel index vacuum. We check this case first + * because we need to start participants as soon as possible. + */ + launch_worker_for_pcleanup(current_time); + } else { /* @@ -1267,6 +1398,7 @@ do_start_worker(void) worker->wi_dboid = avdb->adw_datid; worker->wi_proc = NULL; worker->wi_launchtime = GetCurrentTimestamp(); + worker->wi_pcleanup = -1; AutoVacuumShmem->av_startingWorker = worker; @@ -1349,6 +1481,136 @@ launch_worker(TimestampTz now) } } +/* + * launch_worker_for_pcleanup + * + * Wrapper for starting a worker (requested by leader of parallel index + * vacuuming) from the launcher. + */ +static void +launch_worker_for_pcleanup(TimestampTz now) +{ + ParallelAutoVacuumWorkItem *item; + WorkerInfo worker; + dlist_node *wptr; + + Assert(nworkers_launched < nworkers_for_idx_autovac); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + /* + * Get a worker entry from the freelist. We checked above, so there + * really should be a free slot. + */ + wptr = dclist_pop_head_node(&AutoVacuumShmem->av_freeWorkers); + + worker = dlist_container(WorkerInfoData, wi_links, wptr); + worker->wi_dboid = InvalidOid; + worker->wi_proc = NULL; + worker->wi_launchtime = GetCurrentTimestamp(); + + /* + * Set indicator, that this workers must join to parallel index vacuum. + * This variable also plays the role of an unique id among parallel index + * vacuum workers. First id is '1', because '0' is reserved for leader. + */ + worker->wi_pcleanup = (nworkers_launched + 1); + + AutoVacuumShmem->av_startingWorker = worker; + + SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + + item = &AutoVacuumShmem->pav_workItem; + Assert(item->active); + + nworkers_launched += 1; + + if (nworkers_launched < nworkers_for_idx_autovac) + { + LWLockRelease(AutovacuumLock); + return; + } + + Assert(item->sync_type == LAUNCHER && + nworkers_launched == nworkers_for_idx_autovac); + + /* + * If launcher managed to launch all requested for parallel index + * vacuum workers, it must handle all possible lock conflicts. + */ + eliminate_lock_conflicts(item, true); + LWLockRelease(AutovacuumLock); +} + +/* + * Must be called from autovacuum launcher when it launched all requested + * workers for parallel index vacuum, or when it realized, that no more + * processes can be launched. + * + * In this function launcher will assign roles in such a way as to avoid lock + * conflicts between leader and supportive workers. + * + * AutovacuumLock must be held in exclusive mode before calling this function! + */ +static void +eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item, bool all_launched) +{ + Assert(AmAutoVacuumLauncherProcess()); + Assert(LWLockHeldByMe(AutovacuumLock)); + + /* So, let's start... */ + + if (item->leader_sleeping_on_ssp && + item->nworkers_sleeping == nworkers_launched) + { + /* + * If both leader and all launched supportive workers are sleeping, then + * only we can wake everyone up. + */ + ConditionVariableBroadcast(&item->cv); + + /* Advance status. */ + item->status = START_SYNC_POINT_PASSED; + } + else if (item->leader_sleeping_on_ssp && + item->nworkers_sleeping < nworkers_launched) + { + /* + * If leader already sleeping, but several supportive workers are + * initing, we shift the responsibility for awakening everyone into the + * worker who completes initialization last + */ + item->sync_type = LAST_WORKER; + } + else if (!item->leader_sleeping_on_ssp && + item->nworkers_sleeping == nworkers_launched) + { + /* + * If only leader is not sleeping - it must wake up all workers when it + * finishes all preparations. + */ + item->sync_type = LEADER; + } + else + { + /* + * If nobody is sleeping, we assume that leader has higher chanses to + * asleep first, so set sync type to LAST_WORKER, but if the last worker + * will see that leader still not sleeping, it will change sync type to + * LEADER and asleep. + */ + item->sync_type = LAST_WORKER; + } + + /* + * If we cannot launch all requested workers, refresh + * nworkers_to_launch value, so that the last worker can find out + * that he is really the last. + */ + if (!all_launched && item->sync_type == LAST_WORKER) + item->nworkers_to_launch = nworkers_launched; +} + /* * Called from postmaster to signal a failure to fork a process to become * worker. The postmaster should kill(SIGUSR2) the launcher shortly @@ -1360,6 +1622,37 @@ AutoVacWorkerFailed(void) AutoVacuumShmem->av_signal[AutoVacForkFailed] = true; } +/* + * Called from autovacuum worker to signal that he needs participants in + * parallel index vacuum. Function sends SIGUSR2 to the launcher and returns + * 'true' iff signal was sent successfully. + */ +bool +AutoVacParallelWorkRequest(void) +{ + if (AutoVacuumShmem->av_launcherpid == 0) + { + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("autovacuum launcher is dead"))); + + return false; + } + + if (kill(AutoVacuumShmem->av_launcherpid, SIGUSR2) < 0) + { + ereport(WARNING, + (errcode(ERRCODE_SYSTEM_ERROR), + errmsg("failed to send signal to autovac launcher (pid %d): %m", + AutoVacuumShmem->av_launcherpid))); + + return false; + } + + AutoVacuumShmem->av_signal[AutoVacParallelReq] = true; + return true; +} + /* SIGUSR2: a worker is up and running, or just finished, or failed to fork */ static void avl_sigusr2_handler(SIGNAL_ARGS) @@ -1559,6 +1852,8 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len) { char dbname[NAMEDATALEN]; + Assert(MyWorkerInfo->wi_pcleanup < 0); + /* * Report autovac startup to the cumulative stats system. We * deliberately do this before InitPostgres, so that the @@ -1593,12 +1888,113 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len) recentMulti = ReadNextMultiXactId(); do_autovacuum(); } + else if (AmParallelIdxAutoVacSupportive()) + { + ParallelAutoVacuumWorkItem *item; + dsm_handle handle; + PGPROC *leader_proc; + int leader_proc_pid; + dsm_segment *seg; + shm_toc *toc; + char *asnapspace; + char *tsnapspace; + char dbname[NAMEDATALEN]; + Snapshot tsnapshot; + Snapshot asnapshot; + + /* + * We will abort parallel index vacuuming whithin current process if + * something errors out + */ + PG_TRY(); + { + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + item = &AutoVacuumShmem->pav_workItem; + dbid = item->avw_database; + handle = item->handl; + leader_proc = item->leader_proc; + leader_proc_pid = item->leader_proc_pid; + LWLockRelease(AutovacuumLock); + + InitPostgres(NULL, dbid, NULL, InvalidOid, + INIT_PG_OVERRIDE_ALLOW_CONNS, + dbname); + + set_ps_display(dbname); + if (PostAuthDelay) + pg_usleep(PostAuthDelay * 1000000L); + + /* And do an appropriate amount of work */ + recentXid = ReadNextTransactionId(); + recentMulti = ReadNextMultiXactId(); + + if (parallel_autovacuum_start_sync_point(false) == -1) + { + /* We are not participating anymore */ + MyWorkerInfo->wi_pcleanup = -1; + goto exit; + } + + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(AV_PARALLEL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("invalid magic number in dynamic shared memory segment"))); + + if (!BecomeLockGroupMember(leader_proc, leader_proc_pid)) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not become lock group member"))); + } + + StartTransactionCommand(); + + asnapspace = + shm_toc_lookup(toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, false); + tsnapspace = + shm_toc_lookup(toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT, true); + asnapshot = RestoreSnapshot(asnapspace); + tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot; + RestoreTransactionSnapshot(tsnapshot, leader_proc); + PushActiveSnapshot(asnapshot); + + /* + * We've changed which tuples we can see, and must therefore + * invalidate system caches. + */ + InvalidateSystemCaches(); + + parallel_vacuum_main(seg, toc); + + /* Must pop active snapshot so snapmgr.c doesn't complain. */ + PopActiveSnapshot(); + + dsm_detach(seg); + CommitTransactionCommand(); + ParallelAutovacuumEndSyncPoint(false); + } + PG_CATCH(); + { + EmitErrorReport(); + if (AmParallelIdxAutoVacSupportive()) + handle_parallel_idx_autovac_errors(); + } + PG_END_TRY(); + } /* * The launcher will be notified of my death in ProcKill, *if* we managed * to get a worker slot at all */ +exit: /* All done, go away */ proc_exit(0); } @@ -2461,6 +2857,10 @@ do_autovacuum(void) tab->at_datname, tab->at_nspname, tab->at_relname); EmitErrorReport(); + /* if we are parallel index vacuuming leader, we must shut it down */ + if (AmParallelIdxAutoVacLeader()) + handle_parallel_idx_autovac_errors(); + /* this resets ProcGlobal->statusFlags[i] too */ AbortOutOfAnyTransaction(); FlushErrorState(); @@ -3296,6 +3696,503 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, return result; } +/* + * Release work item, used for managing parallel index vacuum. Must be called + * once and only from leader worker. + * + * If 'keep_lock' is true, then AutovacuumLock will not be released in the end + * of function execution. + */ +void +AutoVacuumReleaseParallelWork(bool keep_lock) +{ + ParallelAutoVacuumWorkItem *workitem; + + /* + * We might not get the workitem from launcher (we must not be considered + * as leader in this case), so just leave. + */ + if (!AmParallelIdxAutoVacLeader()) + return; + + if (!LWLockHeldByMe(AutovacuumLock)) + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + workitem = &AutoVacuumShmem->pav_workItem; + + Assert(AmParallelIdxAutoVacLeader() && + workitem->leader_proc_pid == MyProcPid); + + workitem->leader_proc = NULL; + workitem->leader_proc_pid = 0; + workitem->active = false; + + /* We are not leader anymore. */ + MyWorkerInfo->wi_pcleanup = -1; + + if (!keep_lock) + LWLockRelease(AutovacuumLock); +} + +static bool +start_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item) +{ + bool need_wakeup = false; + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + if (AmParallelIdxAutoVacLeader()) + { + /* + * In normal case we should exit sleep loop after last launched + * supportive worker passed sync point (status == START_SYNC_POINT_PASSED). + * But if we are in SHUTDOWN mode, all launched workers will just exit + * sync point whithout status advancing. We can handle such case if we + * check that n_participating == n_to_launch. + */ + if (item->status == SHUTDOWN) + need_wakeup = (item->nworkers_participating == item->nworkers_to_launch); + else + need_wakeup = item->status == START_SYNC_POINT_PASSED; + } + else + need_wakeup = (item->status == START_SYNC_POINT_PASSED || + item->status == SHUTDOWN); + + LWLockRelease(AutovacuumLock); + return need_wakeup; +} + +static bool +end_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item) +{ + bool need_wakeup = false; + + Assert(AmParallelIdxAutoVacLeader()); + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + need_wakeup = item->status == END_SYNC_POINT_PASSED; + LWLockRelease(AutovacuumLock); + return need_wakeup; +} + +/* + * Waiting on condition variable is frequent operation, so it has beed taken + * out with a separate function. Caller must acquire hold AutovacuumLock before + * calling it. + */ +static void +CVSleep(ParallelAutoVacuumWorkItem *item, wakeup_condition wakeup_cond) +{ + ConditionVariablePrepareToSleep(&item->cv); + + LWLockRelease(AutovacuumLock); + PG_TRY(); + { + do + { + ConditionVariableSleep(&item->cv, PG_WAIT_IPC); + } while (!wakeup_cond(item)); + } + PG_CATCH(); + { + ConditionVariableCancelSleep(); + PG_RE_THROW(); + } + PG_END_TRY(); + + ConditionVariableCancelSleep(); + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); +} + +/* + * This function used to synchronize leader with supportive workers during + * parallel index vacuuming. Each process will exit iff: + * Leader worker is ready to perform parallel vacuum && + * All launched supportive workers are ready to perform parallel vacuum && + * (Autovacuum launcher already launched all requested workers || + * Autovacuum launcher cannot launch more workers) + * + * If 'keep_lock' is true, then AutovacuumLock will not be released in the end + * of function execution. + * + * NOTE: Some workers may call this function when leader worker decided to shut + * down parallel vacuuming. In this case '-1' value will be returned. + */ +static int +parallel_autovacuum_start_sync_point(bool keep_lock) +{ + ParallelAutoVacuumWorkItem *workitem; + SyncType sync_type; + int num_participants; + + if (!LWLockHeldByMe(AutovacuumLock)) + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + workitem = &AutoVacuumShmem->pav_workItem; + Assert(workitem->active); + sync_type = workitem->sync_type; + + if (AmParallelIdxAutoVacLeader()) + { + Assert(workitem->leader_proc_pid == MyProcPid); + + /* Wake up all sleeping supportive workers, if required ... */ + if (sync_type == LEADER) + { + ConditionVariableBroadcast(&workitem->cv); + + /* + * Advance status, because we are guaranteed to pass this + * sync point. + * Don't advance if we call this function from error handle function + * (status == SHUTDOWN). + */ + if (workitem->status != SHUTDOWN) + workitem->status = START_SYNC_POINT_PASSED; + } + /* ... otherwise, wait for somebody to wake us up */ + else + { + workitem->leader_sleeping_on_ssp = true; + CVSleep(workitem, start_sync_point_wakeup_cond); + workitem->leader_sleeping_on_ssp = false; + + /* + * A priori, we believe that in the end everyone should be awakened + * by the leader. + */ + workitem->sync_type = LEADER; + } + } + else + { + workitem->nworkers_participating += 1; + + /* + * If we know, that launcher will no longer attempt to launch more + * supportive workers for this item => we are LAST_WORKER for sure. + * + * Note, that launcher set LAST_WORKER sync type without knowing + * current status of leader. So we also check that leader is sleeping + * before wake all up. Otherwise, we must wait for leader (and ask him + * to wake all up). + */ + if (workitem->nworkers_participating == workitem->nworkers_to_launch && + sync_type == LAST_WORKER && workitem->leader_sleeping_on_ssp) + { + ConditionVariableBroadcast(&workitem->cv); + + /* + * We must not advance status if leader wants to shut down parallel + * execution (see checks below). + */ + if (workitem->status != SHUTDOWN) + workitem->status = START_SYNC_POINT_PASSED; + } + else + { + if (workitem->nworkers_participating == workitem->nworkers_to_launch && + sync_type == LAST_WORKER) + { + workitem->sync_type = LEADER; + } + + workitem->nworkers_sleeping += 1; + CVSleep(workitem, start_sync_point_wakeup_cond); + workitem->nworkers_sleeping -= 1; + } + } + + /* Tell caller that it must not participate in parallel index cleanup. */ + if (workitem->status == SHUTDOWN) + num_participants = -1; + else + num_participants = workitem->nworkers_participating; + + if (!keep_lock) + LWLockRelease(AutovacuumLock); + + return num_participants; +} + +/* + * Like function above, but must be called by leader and supportive workers + * when they finished parallel index vacuum. + * + * If 'keep_lock' is true, then AutovacuumLock will not be released in the end + * of function execution. + */ +void +ParallelAutovacuumEndSyncPoint(bool keep_lock) +{ + ParallelAutoVacuumWorkItem *workitem; + + if (!LWLockHeldByMe(AutovacuumLock)) + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + workitem = &AutoVacuumShmem->pav_workItem; + Assert(workitem->active); + + if (workitem->nworkers_participating == 0) + { + Assert(!AmParallelIdxAutoVacSupportive()); + + /* + * We have two cases when no supportive workers were launched: + * 1) Leader got workitem, but launcher didn't launch any + * workers => just advance status, because we don't need to wait + * for anybody. + * 2) Leader didn't get workitem, because it was already in use => + * we must not touch it. Just leave. + */ + if (AmParallelIdxAutoVacLeader()) + { + Assert(workitem->leader_proc_pid == MyProcPid); + workitem->status = END_SYNC_POINT_PASSED; + } + else + Assert(workitem->leader_proc_pid != MyProcPid); + + if (!keep_lock) + LWLockRelease(AutovacuumLock); + + return; + } + + if (AmParallelIdxAutoVacLeader()) + { + Assert(workitem->leader_proc_pid == MyProcPid); + Assert(workitem->sync_type == LEADER); + + /* Wait for all workers to finish (only last worker will wake us up) */ + if (workitem->nfinished != workitem->nworkers_participating) + { + workitem->sync_type = LAST_WORKER; + workitem->leader_sleeping_on_esp = true; + CVSleep(workitem, end_sync_point_wakeup_cond); + workitem->leader_sleeping_on_esp = false; + + Assert(workitem->nfinished == workitem->nworkers_participating); + + /* + * Advance status, because we are guaranteed to pass this + * sync point. + */ + workitem->status = END_SYNC_POINT_PASSED; + } + } + else + { + workitem->nfinished += 1; + + /* If we are last finished worker - wake up the leader. + * + * If not - just leave, because supportive worker already finished all + * work and must die. + */ + if (workitem->sync_type == LAST_WORKER && + workitem->nfinished == workitem->nworkers_participating && + workitem->leader_sleeping_on_esp) + { + ConditionVariableBroadcast(&workitem->cv); + + /* + * Don't need to check SHUTDOWN status here - all supportive workers + * are about to finish anyway. + */ + workitem->status = END_SYNC_POINT_PASSED; + } + + /* We are not participate anymore */ + MyWorkerInfo->wi_pcleanup = -1; + } + + if (!keep_lock) + LWLockRelease(AutovacuumLock); + + return; +} + +/* + * Get id of parallel index vacuum worker (counting from 0). + */ +int +GetAutoVacuumParallelWorkerNumber(void) +{ + Assert(AmAutoVacuumWorkerProcess() && MyWorkerInfo->wi_pcleanup > 0); + return (MyWorkerInfo->wi_pcleanup - 1); +} + +/* + * Leader autovacuum process can decide, that he needs several helper workers + * to process table in parallel mode. He must set up parallel context and call + * LaunchParallelAutovacuumWorkers. + * + * In this function we do following : + * 1) Send signal to autovacuum lancher that creates 'supportive workers' + * during launcher's standard work loop. + * 2) Wait for supportive workers to start. + * + * Funcition return number of workers that launcher was able to launch (may be + * less then 'nworkers_to_launch'). + */ +int +LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch, + dsm_handle handle) +{ + int nworkers_launched = 0; + ParallelAutoVacuumWorkItem *workitem; + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + workitem = &AutoVacuumShmem->pav_workItem; + + /* + * For now, there can be only one leader across all cluster. + * TODO: fix it in next versions + */ + if (workitem->active && workitem->leader_proc_pid != MyProcPid) + { + LWLockRelease(AutovacuumLock); + return -1; + } + + /* Notify autovacuum launcher that we need supportive workers */ + if (AutoVacParallelWorkRequest()) + { + /* OK, we can use this workitem entry. Init it. */ + workitem->avw_database = MyDatabaseId; + workitem->avw_relation = rel_id; + workitem->handl = handle; + workitem->leader_proc = MyProc; + workitem->leader_proc_pid = MyProcPid; + workitem->nworkers_participating = 0; + workitem->nworkers_to_launch = nworkers_to_launch; + workitem->leader_sleeping_on_ssp = false; + workitem->leader_sleeping_on_esp = false; + workitem->nworkers_sleeping = 0; + workitem->nfinished = 0; + workitem->sync_type = LAUNCHER; + workitem->status = STARTUP; + + workitem->active = true; + LWLockRelease(AutovacuumLock); + + /* Become the leader */ + MyWorkerInfo->wi_pcleanup = 0; + + /* All created workers must get same locks as leader process */ + BecomeLockGroupLeader(); + + /* + * Wait until all supprotive workers are launched. Also retrieve actual + * number of participants + */ + + nworkers_launched = parallel_autovacuum_start_sync_point(false); + Assert(nworkers_launched >= 0); + } + else + { + /* + * If we (for any reason) cannot send signal to the launcher, don't try + * to do index vacuuming in parallel + */ + LWLockRelease(AutovacuumLock); + return 0; + } + + return nworkers_launched; +} + +/* + * During parallel index vacuuming any worker (both supportives and leader) can + * catch an error. + * In order to handle it in the right way we must call this function. + */ +static void +handle_parallel_idx_autovac_errors(void) +{ + ParallelAutoVacuumWorkItem *item; + + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + item = &AutoVacuumShmem->pav_workItem; + + if (AmParallelIdxAutoVacLeader()) + { + if (item->status == START_SYNC_POINT_PASSED) + { + /* + * If start sync point already passed - just wait for all supportive + * workers to finish and exit. + */ + ParallelAutovacuumEndSyncPoint(true); + } + else if (item->status == STARTUP) + { + /* + * If no sync point are passed we can prevent supportive workers + * from performing their work - set SHUTDOWN status and wait while + * all workers will see it. + */ + item->status = SHUTDOWN; + parallel_autovacuum_start_sync_point(true); + } + + AutoVacuumReleaseParallelWork(true); + } + else + { + Assert(AmParallelIdxAutoVacSupportive()); + + if (item->status == STARTUP || item->status == SHUTDOWN) + { + /* + * If no sync point are passed - just exclude ourselves from + * participants. Further parallel index vacuuming will take place + * as usual. + */ + item->nworkers_to_launch -= 1; + + if (item->nworkers_participating == item->nworkers_to_launch && + item->sync_type == LAST_WORKER && item->leader_sleeping_on_ssp) + { + ConditionVariableBroadcast(&item->cv); + + if (item->status != SHUTDOWN) + item->status = START_SYNC_POINT_PASSED; + } + } + else if (item->status == START_SYNC_POINT_PASSED) + { + /* + * If start sync point already passed we will simulate the usual + * end of work (see ParallelAutovacuumEndSyncPoint). + */ + item->nfinished += 1; + + /* + * We check "!item->leader_sleeping_on_ssp" in order to handle an + * almost impossible situation, when leader didn't have time to wake + * up after start sync point (but last worker already advenced + * status to START_SYNC_POINT_PASSED). In this case we should not + * advance status to END_SYNC_POINT_PASSED, so leader can continue + * processing. + */ + if (item->sync_type == LAST_WORKER && + item->nfinished == item->nworkers_participating && + !item->leader_sleeping_on_ssp) + { + ConditionVariableBroadcast(&item->cv); + item->status = END_SYNC_POINT_PASSED; + } + } + } + + LWLockRelease(AutovacuumLock); +} + /* * autovac_init * This is called at postmaster initialization. @@ -3361,6 +4258,9 @@ AutoVacuumShmemInit(void) AutoVacuumShmem->av_startingWorker = NULL; memset(AutoVacuumShmem->av_workItems, 0, sizeof(AutoVacuumWorkItem) * NUM_WORKITEMS); + memset(&AutoVacuumShmem->pav_workItem, 0, + sizeof(ParallelAutoVacuumWorkItem)); + ConditionVariableInit(&AutoVacuumShmem->pav_workItem.cv); worker = (WorkerInfo) ((char *) AutoVacuumShmem + MAXALIGN(sizeof(AutoVacuumShmemStruct))); diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 2f8cbd86759..2e36921097a 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3647,6 +3647,36 @@ struct config_int ConfigureNamesInt[] = check_autovacuum_work_mem, NULL, NULL }, + { + {"max_parallel_index_autovac_workers", PGC_POSTMASTER, VACUUM_AUTOVACUUM, + gettext_noop("Sets the maximum number of parallel autovacuum worker processes during parallel index vacuuming of single table."), + NULL + }, + &max_parallel_index_autovac_workers, + 0, 0, MAX_PARALLEL_WORKER_LIMIT, + NULL, NULL, NULL + }, + + { + {"autovac_idx_parallel_min_rows", PGC_POSTMASTER, VACUUM_AUTOVACUUM, + gettext_noop("Sets the minimum number of dead tuples in single table that requires parallel index processing during autovacuum."), + NULL + }, + &autovac_idx_parallel_min_rows, + 0, 0, INT32_MAX, + NULL, NULL, NULL + }, + + { + {"autovac_idx_parallel_min_indexes", PGC_POSTMASTER, VACUUM_AUTOVACUUM, + gettext_noop("Sets the minimum number indexes created on single table that requires parallel index processing during autovacuum."), + NULL + }, + &autovac_idx_parallel_min_indexes, + 2, 2, INT32_MAX, + NULL, NULL, NULL + }, + { {"tcp_keepalives_idle", PGC_USERSET, CONN_AUTH_TCP, gettext_noop("Time between issuing TCP keepalives."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 34826d01380..08869398039 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -146,6 +146,12 @@ #hash_mem_multiplier = 2.0 # 1-1000.0 multiplier on hash table work_mem #maintenance_work_mem = 64MB # min 64kB #autovacuum_work_mem = -1 # min 64kB, or -1 to use maintenance_work_mem +#max_parallel_index_autovac_workers = 0 # this feature disabled by default + # (change requires restart) +#autovac_idx_parallel_min_rows = 0 + # (change requires restart) +#autovac_idx_parallel_min_indexes = 2 + # (change requires restart) #logical_decoding_work_mem = 64MB # min 64kB #max_stack_depth = 2MB # min 100kB #shared_memory_type = mmap # the default is the first option diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index e8135f41a1c..8647154437b 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -15,6 +15,8 @@ #define AUTOVACUUM_H #include "storage/block.h" +#include "storage/dsm_impl.h" +#include "storage/lock.h" /* * Other processes can request specific work from autovacuum, identified by @@ -25,12 +27,25 @@ typedef enum AVW_BRINSummarizeRange, } AutoVacuumWorkItemType; +/* + * Magic number for parallel context TOC. Used for parallel index processing + * during autovacuum. + */ +#define AV_PARALLEL_MAGIC 0xaaaaaaaa + +/* Magic numbers for per-context parallel index processing state sharing. */ +#define AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFF0000000000001) +#define AV_PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFF0000000000002) + /* GUC variables */ extern PGDLLIMPORT bool autovacuum_start_daemon; extern PGDLLIMPORT int autovacuum_worker_slots; extern PGDLLIMPORT int autovacuum_max_workers; extern PGDLLIMPORT int autovacuum_work_mem; +extern PGDLLIMPORT int max_parallel_index_autovac_workers; +extern PGDLLIMPORT int autovac_idx_parallel_min_rows; +extern PGDLLIMPORT int autovac_idx_parallel_min_indexes; extern PGDLLIMPORT int autovacuum_naptime; extern PGDLLIMPORT int autovacuum_vac_thresh; extern PGDLLIMPORT int autovacuum_vac_max_thresh; @@ -60,10 +75,18 @@ extern void AutoVacWorkerFailed(void); pg_noreturn extern void AutoVacLauncherMain(const void *startup_data, size_t startup_data_len); pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t startup_data_len); +/* called from autovac worker when it needs participants in parallel index cleanup */ +extern bool AutoVacParallelWorkRequest(void); extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId, BlockNumber blkno); +extern void AutoVacuumReleaseParallelWork(bool keep_lock); +extern int AutoVacuumParallelWorkWaitForStart(void); +extern void ParallelAutovacuumEndSyncPoint( bool keep_lock); +extern int GetAutoVacuumParallelWorkerNumber(void); +extern int LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch, + dsm_handle handle); /* shared memory stuff */ extern Size AutoVacuumShmemSize(void); extern void AutoVacuumShmemInit(void); diff --git a/src/test/modules/autovacuum/.gitignore b/src/test/modules/autovacuum/.gitignore new file mode 100644 index 00000000000..0b54641bceb --- /dev/null +++ b/src/test/modules/autovacuum/.gitignore @@ -0,0 +1 @@ +/tmp_check/ \ No newline at end of file diff --git a/src/test/modules/autovacuum/Makefile b/src/test/modules/autovacuum/Makefile new file mode 100644 index 00000000000..90c00ff350b --- /dev/null +++ b/src/test/modules/autovacuum/Makefile @@ -0,0 +1,14 @@ +# src/test/modules/autovacuum/Makefile + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/autovacuum +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif \ No newline at end of file diff --git a/src/test/modules/autovacuum/t/001_autovac_parallel.pl b/src/test/modules/autovacuum/t/001_autovac_parallel.pl new file mode 100644 index 00000000000..ff07c33d867 --- /dev/null +++ b/src/test/modules/autovacuum/t/001_autovac_parallel.pl @@ -0,0 +1,137 @@ +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $psql_out; + +my $node = PostgreSQL::Test::Cluster->new('node1'); +$node->init; +$node->append_conf('postgresql.conf', qq{ + autovacuum = off + max_wal_size = 4096 +}); +$node->start; + +my $indexes_num = 80; +my $initial_rows_num = 1_000_000; + +# Create big table and create specified number of b-tree indexes on it +$node->safe_psql('postgres', qq{ + CREATE TABLE test_autovac ( + id SERIAL PRIMARY KEY, + col_1 INTEGER, col_2 INTEGER, col_3 INTEGER, col_4 INTEGER, col_5 INTEGER, + col_6 INTEGER, col_7 INTEGER, col_8 INTEGER, col_9 INTEGER, col_10 INTEGER, + col_11 INTEGER, col_12 INTEGER, col_13 INTEGER, col_14 INTEGER, col_15 INTEGER, + col_16 INTEGER, col_17 INTEGER, col_18 INTEGER, col_19 INTEGER, col_20 INTEGER, + col_21 INTEGER, col_22 INTEGER, col_23 INTEGER, col_24 INTEGER, col_25 INTEGER, + col_26 INTEGER, col_27 INTEGER, col_28 INTEGER, col_29 INTEGER, col_30 INTEGER, + col_31 INTEGER, col_32 INTEGER, col_33 INTEGER, col_34 INTEGER, col_35 INTEGER, + col_36 INTEGER, col_37 INTEGER, col_38 INTEGER, col_39 INTEGER, col_40 INTEGER, + col_41 INTEGER, col_42 INTEGER, col_43 INTEGER, col_44 INTEGER, col_45 INTEGER, + col_46 INTEGER, col_47 INTEGER, col_48 INTEGER, col_49 INTEGER, col_50 INTEGER, + col_51 INTEGER, col_52 INTEGER, col_53 INTEGER, col_54 INTEGER, col_55 INTEGER, + col_56 INTEGER, col_57 INTEGER, col_58 INTEGER, col_59 INTEGER, col_60 INTEGER, + col_61 INTEGER, col_62 INTEGER, col_63 INTEGER, col_64 INTEGER, col_65 INTEGER, + col_66 INTEGER, col_67 INTEGER, col_68 INTEGER, col_69 INTEGER, col_70 INTEGER, + col_71 INTEGER, col_72 INTEGER, col_73 INTEGER, col_74 INTEGER, col_75 INTEGER, + col_76 INTEGER, col_77 INTEGER, col_78 INTEGER, col_79 INTEGER, col_80 INTEGER, + col_81 INTEGER, col_82 INTEGER, col_83 INTEGER, col_84 INTEGER, col_85 INTEGER, + col_86 INTEGER, col_87 INTEGER, col_88 INTEGER, col_89 INTEGER, col_90 INTEGER, + col_91 INTEGER, col_92 INTEGER, col_93 INTEGER, col_94 INTEGER, col_95 INTEGER, + col_96 INTEGER, col_97 INTEGER, col_98 INTEGER, col_99 INTEGER, col_100 INTEGER + ); + + DO \$\$ + DECLARE + i INTEGER; + BEGIN + FOR i IN 1..$indexes_num LOOP + EXECUTE format('CREATE INDEX idx_col_\%s ON test_autovac (col_\%s);', i, i); + END LOOP; + END \$\$; +}); + +$node->psql('postgres', + "SELECT COUNT(*) FROM pg_index i + JOIN pg_class c ON c.oid = i.indrelid + WHERE c.relname = 'test_autovac';", + stdout => \$psql_out +); +is($psql_out, $indexes_num + 1, "All indexes created successfully"); + +$node->safe_psql('postgres', qq{ + DO \$\$ + DECLARE + i INTEGER; + BEGIN + FOR i IN 1..$initial_rows_num LOOP + INSERT INTO test_autovac ( + col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, + col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, + col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, + col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, + col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49, col_50, + col_51, col_52, col_53, col_54, col_55, col_56, col_57, col_58, col_59, col_60, + col_61, col_62, col_63, col_64, col_65, col_66, col_67, col_68, col_69, col_70, + col_71, col_72, col_73, col_74, col_75, col_76, col_77, col_78, col_79, col_80, + col_81, col_82, col_83, col_84, col_85, col_86, col_87, col_88, col_89, col_90, + col_91, col_92, col_93, col_94, col_95, col_96, col_97, col_98, col_99, col_100 + ) VALUES ( + i, i + 1, i + 2, i + 3, i + 4, i + 5, i + 6, i + 7, i + 8, i + 9, + i + 10, i + 11, i + 12, i + 13, i + 14, i + 15, i + 16, i + 17, i + 18, i + 19, + i + 20, i + 21, i + 22, i + 23, i + 24, i + 25, i + 26, i + 27, i + 28, i + 29, + i + 30, i + 31, i + 32, i + 33, i + 34, i + 35, i + 36, i + 37, i + 38, i + 39, + i + 40, i + 41, i + 42, i + 43, i + 44, i + 45, i + 46, i + 47, i + 48, i + 49, + i + 50, i + 51, i + 52, i + 53, i + 54, i + 55, i + 56, i + 57, i + 58, i + 59, + i + 60, i + 61, i + 62, i + 63, i + 64, i + 65, i + 66, i + 67, i + 68, i + 69, + i + 70, i + 71, i + 72, i + 73, i + 74, i + 75, i + 76, i + 77, i + 78, i + 79, + i + 80, i + 81, i + 82, i + 83, i + 84, i + 85, i + 86, i + 87, i + 88, i + 89, + i + 90, i + 91, i + 92, i + 93, i + 94, i + 95, i + 96, i + 97, i + 98, i + 99 + ); + END LOOP; + END \$\$; +}); + +$node->psql('postgres', + "SELECT COUNT(*) FROM test_autovac;", + stdout => \$psql_out +); +is($psql_out, $initial_rows_num, "All data inserted into table successfully"); + +$node->safe_psql('postgres', qq{ + UPDATE test_autovac SET col_1 = 0 WHERE (col_1 % 3) = 0; + ANALYZE test_autovac; +}); + +my $dead_tuples_thresh = $initial_rows_num / 4; +my $indexes_num_thresh = $indexes_num / 2; +my $num_workers = 3; + +# Reduce autovacuum_work_mem, so leader process will perform parallel indexi +# vacuum phase several times +$node->append_conf('postgresql.conf', qq{ + autovacuum_naptime = '1s' + autovacuum_work_mem = 2048 + autovacuum_vacuum_threshold = 1 + autovacuum_analyze_threshold = 1 + autovacuum_vacuum_scale_factor = 0.1 + autovacuum_analyze_scale_factor = 0.1 + autovacuum_max_workers = 10 + autovacuum = on + autovac_idx_parallel_min_rows = $dead_tuples_thresh + autovac_idx_parallel_min_indexes = $indexes_num_thresh + max_parallel_index_autovac_workers = $num_workers +}); + +$node->restart; + +# wait for autovacuum to reset datfrozenxid age to 0 +$node->poll_query_until('postgres', q{ + SELECT count(*) = 0 FROM pg_database WHERE mxid_age(datfrozenxid) > 0 +}) or die "Timed out while waiting for autovacuum"; + +ok(1, "There are no segfaults"); + +$node->stop; +done_testing(); -- 2.43.0