And background worker patch in attachment.
05.09.2018 15:25, Andrey Lepikhov пишет:
Hi,
I prepared next version of Background worker (cleaner) based on a retail
indextuple deletion patch.
This version shows stable behavior on regression tests and pgbench
workloads.
In this version:
1. Only AccessShareLock are acquired on a cleanup of heap and index
relations.
2. Some 'aggressive' cleanup strategy introduced - conditional cleanup
locks not used.
3. Cleanup only an in-memory blocks.
4. The Cleaner calls heap_page_prune() before cleanup a block.
Benchmarks
---------
Two factors were evaluated: performance (tps) and relations blowing.
Before each test some rarefaction of pgbench_accounts was modeled by
deletion 10% of tuples at each block.
Also, I tested normal and Gaussian distribution of queries on
pgbench_accounts relation.
Autovacuum uses default settings.
Script:
pgbench -i -s 10
psql -c $"DELETE FROM pgbench_accounts WHERE (random() < 0.1);"
psql -c $"VACUUM;"
psql -c $"CREATE INDEX pgbench_accounts_ext ON public.pgbench_accounts
USING btree (abalance);" &&
pgbench -T 3600 -c 32 -j 8 -M prepared -P 600
NORMAL distribution:
average tps = 1045 (cleaner); = 1077 (autovacuum)
Relations size at the end of test, MB:
pgbench_accounts: 128 (cleaner); 128 (autovacuum)
pgbench_branches: 0.1 (cleaner); 2.1 (autovacuum)
pgbench_tellers: 0.4 (cleaner); 2.8 (autovacuum)
pgbench_accounts_pkey: 21 (cleaner); 43 (autovacuum)
pgbench_accounts_ext: 48 (cleaner); 56 (autovacuum)
Gaussian distribution:
average tps = 213 (cleaner); = 213 (autovacuum)
Relations size at the end of test, MB:
pgbench_accounts: 128 (cleaner); 128 (autovacuum)
pgbench_accounts_ext: 22 (cleaner); 29 (autovacuum)
Conclusions
-----------
1. For retail indextuple deletion purposes i replaced ItemIdSetDead() by
ItemIdMarkDead() in heap_page_prune_execute() operation. Hereupon in the
case of 100% filling of each relation block we get a blowing HEAP and
index , more or less. When the blocks already have free space, the
cleaner can delay blowing the heap and index without a vacuum.
2. Cleaner works fine in the case of skewness of access frequency to
relation blocks.
3. The cleaner does not cause a decrease of performance.
--
Andrey Lepikhov
Postgres Professional
https://postgrespro.com
The Russian Postgres Company
>From b092dfd95b8673d692730ac27a1d6fdb76b66601 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru>
Date: Thu, 6 Sep 2018 11:05:42 +0300
Subject: [PATCH 5/5] Heap-and-Index-cleaner
---
.../postgres_fdw/expected/postgres_fdw.out | 30 +-
contrib/postgres_fdw/sql/postgres_fdw.sql | 4 +-
src/backend/access/heap/pruneheap.c | 6 +-
src/backend/access/nbtree/nbtree.c | 15 +-
src/backend/access/transam/xact.c | 4 +
src/backend/catalog/system_views.sql | 11 +
src/backend/commands/vacuumlazy.c | 44 +-
src/backend/postmaster/Makefile | 2 +-
src/backend/postmaster/bgheap.c | 1990 +++++++++++++++++
src/backend/postmaster/pgstat.c | 36 +
src/backend/postmaster/postmaster.c | 160 +-
src/backend/storage/buffer/bufmgr.c | 60 +-
src/backend/storage/buffer/localbuf.c | 24 +
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/backend/storage/lmgr/proc.c | 5 +-
src/backend/tcop/postgres.c | 12 +
src/backend/utils/adt/pgstatfuncs.c | 2 +
src/backend/utils/hash/Makefile | 2 +-
src/backend/utils/hash/shash.c | 265 +++
src/backend/utils/init/miscinit.c | 3 +-
src/backend/utils/init/postinit.c | 11 +-
src/include/commands/progress.h | 12 +
src/include/commands/vacuum.h | 3 +
src/include/pgstat.h | 14 +-
src/include/postmaster/bgheap.h | 36 +
src/include/storage/buf_internals.h | 1 +
src/include/storage/bufmgr.h | 5 +-
src/include/storage/pmsignal.h | 2 +
src/include/utils/shash.h | 54 +
src/test/regress/expected/rules.out | 18 +-
src/test/regress/expected/triggers.out | 2 +-
src/test/regress/input/constraints.source | 12 +-
src/test/regress/output/constraints.source | 34 +-
src/test/regress/sql/rules.sql | 2 +-
src/test/regress/sql/triggers.sql | 2 +-
36 files changed, 2803 insertions(+), 84 deletions(-)
create mode 100644 src/backend/postmaster/bgheap.c
create mode 100644 src/backend/utils/hash/shash.c
create mode 100644 src/include/postmaster/bgheap.h
create mode 100644 src/include/utils/shash.h
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f5498c62bd..622bea2a7d 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -5496,19 +5496,25 @@ ALTER SERVER loopback OPTIONS (DROP extensions);
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
EXPLAIN (verbose, costs off)
-UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *; -- can't be pushed down
- QUERY PLAN
-----------------------------------------------------------------------------------------------------------
- Update on public.ft2
- Output: c1, c2, c3, c4, c5, c6, c7, c8
- Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
- -> Foreign Scan on public.ft2
- Output: c1, c2, NULL::integer, 'bar'::text, c4, c5, c6, c7, c8, ctid
- Filter: (postgres_fdw_abs(ft2.c1) > 2000)
- Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" FOR UPDATE
-(7 rows)
+with updated AS ( UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING * ) SELECT * FROM updated ORDER BY c1; -- can't be pushed down
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------
+ Sort
+ Output: updated.c1, updated.c2, updated.c3, updated.c4, updated.c5, updated.c6, updated.c7, updated.c8
+ Sort Key: updated.c1
+ CTE updated
+ -> Update on public.ft2
+ Output: ft2.c1, ft2.c2, ft2.c3, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8
+ Remote SQL: UPDATE "S 1"."T 1" SET c3 = $2 WHERE ctid = $1 RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8
+ -> Foreign Scan on public.ft2
+ Output: ft2.c1, ft2.c2, NULL::integer, 'bar'::text, ft2.c4, ft2.c5, ft2.c6, ft2.c7, ft2.c8, ft2.ctid
+ Filter: (postgres_fdw_abs(ft2.c1) > 2000)
+ Remote SQL: SELECT "C 1", c2, c4, c5, c6, c7, c8, ctid FROM "S 1"."T 1" FOR UPDATE
+ -> CTE Scan on updated
+ Output: updated.c1, updated.c2, updated.c3, updated.c4, updated.c5, updated.c6, updated.c7, updated.c8
+(13 rows)
-UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
+with updated AS ( UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING * ) SELECT * FROM updated ORDER BY c1;
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
------+----+-----+----+----+----+------------+----
2001 | 1 | bar | | | | ft2 |
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e1b955f3f0..6034e83108 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1137,8 +1137,8 @@ ALTER SERVER loopback OPTIONS (DROP extensions);
INSERT INTO ft2 (c1,c2,c3)
SELECT id, id % 10, to_char(id, 'FM00000') FROM generate_series(2001, 2010) id;
EXPLAIN (verbose, costs off)
-UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *; -- can't be pushed down
-UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING *;
+with updated AS ( UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING * ) SELECT * FROM updated ORDER BY c1; -- can't be pushed down
+with updated AS ( UPDATE ft2 SET c3 = 'bar' WHERE postgres_fdw_abs(c1) > 2000 RETURNING * ) SELECT * FROM updated ORDER BY c1;
EXPLAIN (verbose, costs off)
UPDATE ft2 SET c3 = 'baz'
FROM ft4 INNER JOIN ft5 ON (ft4.c1 = ft5.c1)
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index ebbafe3275..e7c2d897f3 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -22,6 +22,7 @@
#include "catalog/catalog.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "postmaster/bgheap.h"
#include "storage/bufmgr.h"
#include "utils/snapmgr.h"
#include "utils/rel.h"
@@ -156,6 +157,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
/* OK to prune */
(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
+ if (IsBufferDirty(buffer))
+ HeapCleanerSend(relation, BufferGetBlockNumber(buffer));
}
/* And release buffer lock */
@@ -411,7 +414,6 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
heap_prune_record_dead(prstate, rootoffnum);
HeapTupleHeaderAdvanceLatestRemovedXid(htup,
&prstate->latestRemovedXid);
- ndeleted++;
}
/* Nothing more to do */
@@ -583,9 +585,9 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
*/
for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
{
- ndeleted++;
if (chainitems[i] == latestdead)
continue;
+ ndeleted++;
heap_prune_record_unused(prstate, chainitems[i]);
}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 50378e0f51..28650008ef 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -930,7 +930,14 @@ bttargetdelete(IndexTargetDeleteInfo *info,
skey = _bt_mkscankey(irel, itup);
/* Descend the tree and position ourselves on the target leaf page. */
- stack = _bt_search(irel, keysCount, skey, &info->dead_tuples[pos], false, &buf, BT_READ, NULL);
+ stack = _bt_search(irel, keysCount, skey, &info->dead_tuples[pos], false, &buf, BT_WRITE, NULL);
+
+ /* trade in our read lock for a write lock */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(buf, BT_WRITE);
+
+ buf = _bt_moveright(irel, buf, keysCount, skey, &info->dead_tuples[pos],
+ false, true, stack, BT_WRITE, NULL);
/* To prepare tuple entries search across index pages */
Assert(BufferIsValid(buf));
@@ -954,6 +961,7 @@ bttargetdelete(IndexTargetDeleteInfo *info,
*/
if (ndeletable > 0)
{
+ /* trade in our read lock for a write lock */
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBufferForCleanup(buf);
@@ -974,7 +982,7 @@ bttargetdelete(IndexTargetDeleteInfo *info,
* Traverse to a next reliable index page
*/
buf = _bt_moveright(irel, buf, keysCount, skey, &info->dead_tuples[pos],
- false, true, stack, BT_READ, NULL);
+ false, true, stack, BT_WRITE, NULL);
page = BufferGetPage(buf);
_bt_checkpage(irel, buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1019,6 +1027,7 @@ bttargetdelete(IndexTargetDeleteInfo *info,
}
else if (res > 0)
break;
+
pos--;
}
@@ -1033,11 +1042,11 @@ bttargetdelete(IndexTargetDeleteInfo *info,
*/
if (ndeletable > 0)
{
+ /* trade in our read lock for a write lock */
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBufferForCleanup(buf);
_bt_delitems_delete(irel, buf, deletable, ndeletable, hrel);
-
stats->tuples_removed += ndeletable;
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8792..f5e70ae16d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,6 +40,7 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "postmaster/bgheap.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
@@ -2126,6 +2127,7 @@ CommitTransaction(void)
AtEOXact_Files(true);
AtEOXact_ComboCid();
AtEOXact_HashTables(true);
+ AtEOXact_BGHeap_tables(true);
AtEOXact_PgStat(true);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
@@ -2404,6 +2406,7 @@ PrepareTransaction(void)
AtEOXact_Files(true);
AtEOXact_ComboCid();
AtEOXact_HashTables(true);
+ AtEOXact_BGHeap_tables(true);
/* don't call AtEOXact_PgStat here; we fixed pgstat state above */
AtEOXact_Snapshot(true, true);
pgstat_report_xact_timestamp(0);
@@ -2606,6 +2609,7 @@ AbortTransaction(void)
AtEOXact_Files(false);
AtEOXact_ComboCid();
AtEOXact_HashTables(false);
+ AtEOXact_BGHeap_tables(true);
AtEOXact_PgStat(false);
AtEOXact_ApplyLauncher(false);
pgstat_report_xact_timestamp(0);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7251552419..3b6e9aa41c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -903,6 +903,17 @@ CREATE VIEW pg_stat_progress_vacuum AS
FROM pg_stat_get_progress_info('VACUUM') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
+CREATE VIEW pg_stat_progress_cleaner AS
+ SELECT
+ S.pid AS pid, S.datid AS datid, D.datname AS datname,
+ S.relid AS relid,
+ S.param1 AS relations, S.param2 AS queue_len, S.param3 AS ninmem,
+ S.param4 AS cleaned_tuples, S.param5 AS vain_tuples,
+ S.param6 AS missed_blocks, S.param7 AS cleaned_blocks,
+ S.param8 AS vain_blocks, S.param9 AS nlocks, S.param10 AS timeout
+ FROM pg_stat_get_progress_info('CLEANER') AS S
+ LEFT JOIN pg_database D ON S.datid = D.oid;
+
CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 1b952fedd9..9b101e79fb 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -158,8 +158,6 @@ static void lazy_scan_heap(Relation onerel, int options,
bool aggressive);
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
-static void quick_vacuum_index(Relation irel, Relation hrel,
- LVRelStats *vacrelstats);
static void lazy_vacuum_index(Relation indrel,
IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats);
@@ -742,7 +740,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
bool use_quick_strategy = (vacrelstats->num_dead_tuples/vacrelstats->old_live_tuples < target_index_deletion_factor);
if (use_quick_strategy && (Irel[i]->rd_amroutine->amtargetdelete != NULL))
- quick_vacuum_index(Irel[i], onerel, vacrelstats);
+ quick_vacuum_index(Irel[i], onerel, vacrelstats->dead_tuples, vacrelstats->num_dead_tuples);
else
lazy_vacuum_index(Irel[i],
&indstats[i],
@@ -1394,7 +1392,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
bool use_quick_strategy = (vacrelstats->num_dead_tuples/vacrelstats->old_live_tuples < target_index_deletion_factor);
if (use_quick_strategy && (Irel[i]->rd_amroutine->amtargetdelete != NULL))
- quick_vacuum_index(Irel[i], onerel, vacrelstats);
+ quick_vacuum_index(Irel[i], onerel, vacrelstats->dead_tuples, vacrelstats->num_dead_tuples);
else
lazy_vacuum_index(Irel[i],
&indstats[i],
@@ -1730,7 +1728,12 @@ get_tuple_by_tid(Relation rel, ItemPointer tid)
{
offnum = ItemIdGetRedirect(lp);
lp = PageGetItemId(page, offnum);
- Assert(ItemIdIsUsed(lp));
+
+ if (!ItemIdIsUsed(lp))
+ {
+ UnlockReleaseBuffer(buffer);
+ return NULL;
+ }
}
/* Form a tuple */
@@ -1747,30 +1750,34 @@ get_tuple_by_tid(Relation rel, ItemPointer tid)
* quick_vacuum_index() -- quick vacuum one index relation.
*
* Delete all the index entries pointing to tuples listed in
- * vacrelstats->dead_tuples.
+ * dead_tuples.
*/
-static void
+void
quick_vacuum_index(Relation irel, Relation hrel,
- LVRelStats *vacrelstats)
+ ItemPointer dead_tuples,
+ int num_dead_tuples)
{
int tnum;
- bool* found = palloc0(vacrelstats->num_dead_tuples*sizeof(bool));
- IndexInfo* indexInfo = BuildIndexInfo(irel);
- EState* estate = CreateExecutorState();
- ExprContext* econtext = GetPerTupleExprContext(estate);
- ExprState* predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
- TupleTableSlot* slot = MakeSingleTupleTableSlot(RelationGetDescr(hrel));
+ bool *found = palloc0(num_dead_tuples*sizeof(bool));
+ IndexInfo *indexInfo = BuildIndexInfo(irel);
+ EState *estate = CreateExecutorState();
+ ExprContext *econtext = GetPerTupleExprContext(estate);
+ ExprState *predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+ TupleTableSlot *slot = MakeSingleTupleTableSlot(RelationGetDescr(hrel));
IndexTargetDeleteResult stats;
IndexTargetDeleteInfo ivinfo;
+ Assert(found != NULL);
+
+ stats.tuples_removed = 0;
ivinfo.indexRelation = irel;
ivinfo.heapRelation = hrel;
econtext->ecxt_scantuple = slot;
/* Get tuple from heap */
- for (tnum = vacrelstats->num_dead_tuples-1; tnum >= 0; tnum--)
+ for (tnum = num_dead_tuples-1; tnum >= 0; tnum--)
{
HeapTuple tuple;
Datum values[INDEX_MAX_KEYS];
@@ -1781,7 +1788,7 @@ quick_vacuum_index(Relation irel, Relation hrel,
continue;
/* Get a tuple from heap */
- if ((tuple = get_tuple_by_tid(hrel, &(vacrelstats->dead_tuples[tnum]))) == NULL)
+ if ((tuple = get_tuple_by_tid(hrel, &(dead_tuples[tnum]))) == NULL)
{
/*
* Tuple has 'not used' status.
@@ -1814,15 +1821,16 @@ quick_vacuum_index(Relation irel, Relation hrel,
* Make attempt to delete some index entries by one tree descent.
* We use only a part of TID list, which contains not found TID's.
*/
- ivinfo.dead_tuples = vacrelstats->dead_tuples;
+ ivinfo.dead_tuples = dead_tuples;
ivinfo.last_dead_tuple = tnum;
ivinfo.found_dead_tuples = found;
+
index_target_delete(&ivinfo, &stats, values, isnull);
}
- pfree(found);
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
+ pfree(found);
}
/*
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c23211b2..11796375e8 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/postmaster
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
+OBJS = autovacuum.o bgheap.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/bgheap.c b/src/backend/postmaster/bgheap.c
new file mode 100644
index 0000000000..c445b303b3
--- /dev/null
+++ b/src/backend/postmaster/bgheap.c
@@ -0,0 +1,1990 @@
+/*
+ * bgheap.c
+ *
+ * PostgreSQL integrated cleaner of HEAP and INDEX relations
+ *
+ * Made in autovacuum analogy. Uses 'Target' strategy for clean relations,
+ * without full scan.
+ * The cleaner to consist of one Launcher and Workers.
+ * One worker corresponds to one database.
+ * Launcher receives message from a backend {dbOid; relOid; blkno} by
+ * socket and translate it to a worker by shared memory buffer. Worker receive
+ * message and cleanup block of heap relation and its index relations.
+ *
+ * Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/bgheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/htup_details.h"
+#include "access/nbtree.h"
+#include "access/visibilitymap.h"
+#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/index.h"
+#include "catalog/namespace.h"
+#include "commands/progress.h"
+#include "commands/vacuum.h"
+#include "common/ip.h"
+#include "executor/executor.h"
+#include "libpq/pqsignal.h"
+#include "postmaster/bgheap.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/postmaster.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/freespace.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/pmsignal.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/ps_status.h"
+#include "utils/lsyscache.h"
+#include "utils/resowner.h"
+#include "utils/shash.h"
+#include "utils/syscache.h"
+#include "utils/timeout.h"
+
+/*
+ * Maximum number of task items in storage at a backend side before shipping to a
+ * background heap cleaner
+ */
+#define BACKEND_DIRTY_ITEMS_MAX (10)
+
+/*
+ * Maximum number of task items in waiting list at a Launcher side.
+ * It is necessary to make it sufficiently large. If a number of arrived
+ * messages exceed this, we ignore the remains and increase counter of missed
+ * items.
+ */
+#define WAITING_MESSAGES_MAX_NUM (100000)
+
+/*
+ * Maximum number of dirty blocks which can keep a worker for each relation
+ */
+#define WORKER_DIRTYBLOCKS_MAX_NUM (100000)
+
+/* Maximum number of slots for dirty relations */
+#define WORKER_RELATIONS_MAX_NUM (100)
+
+/* Maximum number of task items in a launcher/worker shared buffer */
+#define WORKER_TASK_ITEMS_MAX (1000)
+
+/*
+ * Maximum time interval which worker can idle without a task (ms)
+ */
+#define WORKER_IDLE_TIME_DURATION_MAX (5000)
+
+/* Minimal info for cleanup a block of heap relation and its index relations */
+typedef struct CleanerMessage
+{
+ Oid dbNode; /* Database ID */
+ Oid relid; /* Relation ID */
+ int blkno; /* Block number */
+ TransactionId xid;
+ uint16 hits;
+} CleanerMessage;
+
+typedef struct WorkerTask
+{
+ BlockNumber blkno;
+ TransactionId lastXid;
+ uint16 hits;
+} WorkerTask;
+
+/*
+ * Shared memory data to control and two-way communication with worker
+ */
+typedef struct WorkerInfoData
+{
+ dlist_node links;
+ Oid dbOid; /* Database ID of the worker */
+ TimestampTz launchtime; /* To define a time of last worker activity */
+ int pid; /* Used for system signals passing */
+ CleanerMessage buffer[WORKER_TASK_ITEMS_MAX]; /* Array of work items */
+ int nitems; /* Number of work items in buffer */
+ LWLock WorkItemLock; /* Locker for safe buffer access */
+ int id; /* Used for Internal launcher buffers management */
+} WorkerInfoData;
+
+typedef struct WorkerInfoData *WorkerInfo;
+
+/*
+ * Shared memory info about workers pool.
+ * Launcher get an element from freeWorkers list and init startingWorker value.
+ * Worker set startingWorker to NULL value after startup and add himself
+ * to runningWorkers list.
+ */
+typedef struct HeapCleanerShmemStruct
+{
+ dlist_head freeWorkers;
+ dlist_head runningWorkers;
+ WorkerInfo startingWorker;
+} HeapCleanerShmemStruct;
+
+typedef struct DirtyRelation
+{
+ Oid relid;
+ SHTAB *items;
+} DirtyRelation;
+
+static int stat_vainly_cleaned_tuples = 0;
+static int stat_vainly_cleaned_blocks = 0;
+static uint64 stat_total_deletions = 0;
+static uint64 stat_total_cleaned_blocks = 0;
+static uint64 stat_not_acquired_locks = 0;
+static int stat_buf_ninmem = 0;
+static uint32 stat_missed_blocks = 0;
+
+static MemoryContext BGHeapMemCxt = NULL;
+
+/*
+ * Table of database relations.
+ * For each relation we support a waiting list of dirty blocks.
+ */
+static PSHTAB PrivateRelationsTable = NULL;
+
+static bool am_heapcleaner_launcher = false;
+static bool am_heapcleaner_worker = false;
+
+int heapcleaner_max_workers = 10;
+static WorkerInfo MyWorkerInfo = NULL;
+static HeapCleanerShmemStruct *HeapCleanerShmem;
+
+/*
+ * Hash table for collection of dirty blocks after heap_page_prune() action
+ * at a Backend side.
+ */
+PSHTAB dblocks = NULL;
+
+/* Signal handling */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGUSR2 = false;
+
+NON_EXEC_STATIC pgsocket HeapCleanerSock = PGINVALID_SOCKET;
+static struct sockaddr_storage HeapCleanerSockAddr;
+
+static int TrancheId;
+
+/*
+ * Hash table for waiting data: one for each active worker and one for all messages
+ * intended to not-running workers. Reasons:
+ * 1. Worker in startup process.
+ * 2. Another worker in startup and we can't launch new worker.
+ * 3. We have not free slots for new workers.
+ */
+PSHTAB *wTab;
+
+/*
+ * Parameters of each hash table
+ */
+SHTABCTL wTabCtl;
+
+#define CLEANUP_GENTLY (0)
+#define CLEANUP_AGGRESSIVE (1)
+
+/*
+ * Cleanup strategy
+ */
+static int strategy = CLEANUP_AGGRESSIVE;
+
+#ifdef EXEC_BACKEND
+static pid_t hclauncher_forkexec(void);
+static pid_t hcworker_forkexec(void);
+#endif
+
+static uint64 CleanerMessageHashFunc(void *key, uint64 size, uint64 base);
+static PSHTAB cleanup_relations(DirtyRelation *res, SHTAB *AuxiliaryList, bool got_SIGTERM);
+static bool DefaultCompareFunc(void* bucket1, void* bucket2);
+NON_EXEC_STATIC void HeapCleanerLauncherMain(int argc, char *argv[]) pg_attribute_noreturn();
+NON_EXEC_STATIC void HeapCleanerWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+static bool isEqualMsgs(void *arg1, void *arg2);
+
+static void launch_worker(Oid dbNode);
+static WorkerInfo look_for_worker(Oid dbNode);
+static void main_launcher_loop(void);
+static void main_worker_loop(void);
+
+static void SIGHUP_Handler(SIGNAL_ARGS);
+static void SIGTERM_Handler(SIGNAL_ARGS);
+static void SIGUSR2_Handler(SIGNAL_ARGS);
+static void backend_send_dirty_blocks(void);
+
+#ifdef EXEC_BACKEND
+/*
+ * forkexec routine for the autovacuum launcher process.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+static pid_t
+hclauncher_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkhclauncher";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+
+/*
+ * We need this set from the outside, before InitProcess is called
+ */
+void
+HeapCleanerLauncherIAm(void)
+{
+ am_heapcleaner_launcher = true;
+}
+
+static pid_t
+hcworker_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkhcworker";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+
+/*
+ * We need this set from the outside, before InitProcess is called
+ */
+void
+HeapCleanerWorkerIAm(void)
+{
+ am_heapcleaner_worker = true;
+}
+#endif
+
+/*
+ * Send dirty blocks from a collector's table to launcher at the end of transaction
+ */
+void
+AtEOXact_BGHeap_tables(bool isCommit)
+{
+ MemoryContext oldMemCxt;
+
+ if (BGHeapMemCxt == NULL)
+ BGHeapMemCxt = AllocSetContextCreate(BGHeapMemCxt, "bgheap",
+ ALLOCSET_DEFAULT_SIZES);
+ oldMemCxt = MemoryContextSwitchTo(BGHeapMemCxt);
+
+ backend_send_dirty_blocks();
+
+ MemoryContextSwitchTo(oldMemCxt);
+}
+
+static void
+save_to_list(PSHTAB AuxiliaryList, WorkerTask *item)
+{
+ bool found;
+
+ WorkerTask *new_item = (WorkerTask *)
+ SHASH_Search(AuxiliaryList,
+ (void *) &(item->blkno),
+ HASH_ENTER, &found);
+
+ Assert((new_item != NULL) && (!found));
+ new_item->hits = item->hits;
+ new_item->lastXid = item->lastXid;
+}
+
+/*
+ * Main logic of HEAP and index relations cleaning
+ */
+static PSHTAB
+cleanup_relations(DirtyRelation *res, PSHTAB AuxiliaryList, bool got_SIGTERM)
+{
+ Relation heapRelation;
+ Relation *IndexRelations;
+ int nindexes;
+ LOCKMODE lockmode = AccessShareLock;
+ WorkerTask *item;
+ TransactionId OldestXmin;
+
+ Assert(res != NULL);
+ Assert(res->items != NULL);
+ Assert(AuxiliaryList != NULL);
+ Assert(SHASH_Entries(AuxiliaryList) == 0);
+
+ if (SHASH_Entries(res->items) == 0)
+ return AuxiliaryList;
+
+ if (RecoveryInProgress())
+ {
+ SHASH_Clean(res->items);
+ return AuxiliaryList;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ StartTransactionCommand();
+
+ /* Setting a snapshot ensures that RecentGlobalXmin is kept truly recent. */
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /*
+ * At this point relation availability is not guaranteed.
+ * Make safe test to check this.
+ */
+ heapRelation = try_relation_open(res->relid, lockmode);
+
+ if (!heapRelation)
+ {
+ elog(LOG, "[%d] Cleanup: UnSuccessful opening. Relation deleted? dbOid=%d", res->relid, MyWorkerInfo->dbOid);
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ SHASH_Clean(res->items);
+ return AuxiliaryList;
+ }
+
+ /*
+ * Check relation type similarly vacuum
+ */
+ if (!(pg_class_ownercheck(RelationGetRelid(heapRelation), GetUserId()) ||
+ (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !heapRelation->rd_rel->relisshared)))
+ {
+ if (heapRelation->rd_rel->relisshared)
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- only superuser can vacuum it",
+ RelationGetRelationName(heapRelation))));
+ else if (heapRelation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
+ RelationGetRelationName(heapRelation))));
+ else
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
+ RelationGetRelationName(heapRelation))));
+ relation_close(heapRelation, lockmode);
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ SHASH_Clean(res->items);
+ return AuxiliaryList;
+ }
+
+ /*
+ * Get xid for following prune process. Function here is rather
+ * expensive, so call them only one per round.
+ */
+ OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+
+ Assert(TransactionIdIsValid(OldestXmin));
+ /* Main cleanup cycle */
+ for (SHASH_SeqReset(res->items);
+ (item = (WorkerTask *) SHASH_SeqNext(res->items)) != NULL; )
+ {
+ BlockNumber nblocks;
+ Buffer buffer;
+ ItemPointerData dead_tuples[MaxOffsetNumber];
+ int num_dead_tuples = 0;
+ OffsetNumber offnum;
+ int irnum;
+ Page page;
+ ItemId lp;
+ int tnum;
+ bool found_non_nbtree = false;
+ TransactionId latestRemovedXid;
+
+ Assert(item->hits > 0);
+
+ nblocks = RelationGetNumberOfBlocks(heapRelation);
+
+ if (item->blkno >= nblocks)
+ /*
+ * Block was deleted early.
+ * Skip cleaning and drop block from waiting list.
+ */
+ continue;
+
+ /*
+ * Get and pin the buffer.
+ * If Postgres not in termination state when we get in-memory buffer only
+ */
+ if (!got_SIGTERM)
+ buffer = ReadBufferExtended(heapRelation, MAIN_FORKNUM, item->blkno, RBM_NORMAL_NO_READ, NULL);
+ else
+ buffer = ReadBuffer(heapRelation, item->blkno);
+
+ if (BufferIsInvalid(buffer))
+ {
+ /*
+ * Buffer was evicted from shared buffers already.
+ */
+ Assert(!got_SIGTERM);
+ stat_buf_ninmem++;
+ save_to_list(AuxiliaryList, item);
+ continue;
+ }
+
+ /*
+ * stop cleaning if page was changed by transaction after OldestXmin. In
+ * this case there is high probability that we can't do anything usefull
+ * with it. Let we return to clean later.
+ */
+ if (!got_SIGTERM && !IsBufferDirty(buffer) && (!TransactionIdPrecedesOrEquals(item->lastXid, OldestXmin)))
+ {
+ ReleaseBuffer(buffer);
+ save_to_list(AuxiliaryList, item);
+ continue;
+ }
+
+ /*
+ * Lock the buffer for pruning
+ */
+ if (strategy == CLEANUP_GENTLY)
+ {
+ if (!ConditionalLockBufferForCleanup(buffer))
+ {
+ stat_not_acquired_locks++;
+ pgstat_progress_update_param(PROGRESS_CLEANER_NACQUIRED_LOCKS, stat_not_acquired_locks);
+
+ /* Can't lock buffer. */
+ ReleaseBuffer(buffer);
+ save_to_list(AuxiliaryList, item);
+ continue;
+ }
+ }
+ else if (strategy == CLEANUP_AGGRESSIVE)
+ LockBufferForCleanup(buffer);
+
+ /*
+ * Increase our chances for cleaning more tuples.
+ */
+ (void) heap_page_prune(heapRelation, buffer, OldestXmin, false, &latestRemovedXid);
+
+ page = BufferGetPage(buffer);
+
+ /* Collect dead tuples TID's */
+ for (offnum = FirstOffsetNumber;
+ offnum < PageGetMaxOffsetNumber(page);
+ offnum = OffsetNumberNext(offnum))
+ {
+ lp = PageGetItemId(page, offnum);
+ if (ItemIdIsDead(lp))
+ {
+ ItemPointerSet(&(dead_tuples[num_dead_tuples]), item->blkno, offnum);
+ num_dead_tuples++;
+ }
+ }
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+ if (num_dead_tuples == 0)
+ {
+ /*
+ * Block is clear. Accumulate some stats and continue.
+ */
+ stat_vainly_cleaned_tuples += item->hits;
+ pgstat_progress_update_param(PROGRESS_CLEANER_VAINLY_CLEANED_TUPLES, stat_vainly_cleaned_tuples);
+ stat_vainly_cleaned_blocks++;
+ pgstat_progress_update_param(PROGRESS_CLEANER_VAINLY_CLEANED_BLOCKS, stat_vainly_cleaned_blocks);
+ ReleaseBuffer(buffer);
+ continue;
+ }
+
+ /* Open and lock index relations correspond to the heap relation */
+ vac_open_indexes(heapRelation, ShareLock, &nindexes, &IndexRelations);
+
+ /* Iterate across all index relations */
+ for (irnum = 0; irnum < nindexes; irnum++)
+ {
+ if (IndexRelations[irnum]->rd_amroutine->amtargetdelete == NULL)
+ {
+ /*
+ * Can clean only btree indexes now.
+ */
+ found_non_nbtree = true;
+ continue;
+ }
+
+ quick_vacuum_index(IndexRelations[irnum],
+ heapRelation,
+ dead_tuples,
+ num_dead_tuples);
+ }
+
+ vac_close_indexes(nindexes, IndexRelations, ShareLock);
+
+ /*
+ * If heap relation has not only b-tree indexes, can't clean heap block.
+ */
+ if (!found_non_nbtree)
+ {
+ OffsetNumber unusable[MaxOffsetNumber];
+ int nunusable = 0;
+ Size freespace;
+
+ if (strategy == CLEANUP_GENTLY)
+ {
+ if (!ConditionalLockBufferForCleanup(buffer))
+ {
+ ReleaseBuffer(buffer);
+ save_to_list(AuxiliaryList, item);
+ stat_not_acquired_locks++;
+ pgstat_progress_update_param(PROGRESS_CLEANER_NACQUIRED_LOCKS, stat_not_acquired_locks);
+ continue;
+ }
+ }
+ else if (strategy == CLEANUP_AGGRESSIVE)
+ LockBufferForCleanup(buffer);
+
+ START_CRIT_SECTION();
+
+ /* Release DEAD heap tuples storage */
+ for (tnum = 0; tnum < num_dead_tuples; tnum++)
+ {
+ OffsetNumber offnum = ItemPointerGetOffsetNumber(&dead_tuples[tnum]);
+ ItemId lp = PageGetItemId(page, offnum);
+
+ Assert(ItemIdIsDead(lp));
+ ItemIdSetUnused(lp);
+ unusable[nunusable++] = offnum;
+ }
+
+ if (nunusable > 0)
+ {
+ PageRepairFragmentation(page);
+ PageClearFull(page);
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(heapRelation))
+ {
+ XLogRecPtr recptr = log_heap_clean(heapRelation, buffer,
+ NULL, 0,
+ NULL, 0,
+ unusable, nunusable,
+ InvalidTransactionId);
+
+ PageSetLSN(BufferGetPage(buffer), recptr);
+ }
+ stat_total_deletions += nunusable;
+ stat_total_cleaned_blocks++;
+ pgstat_progress_update_param(PROGRESS_CLEANER_TOTAL_DELETIONS, stat_total_deletions);
+ pgstat_progress_update_param(PROGRESS_CLEANER_CLEANED_BLOCKS, stat_total_cleaned_blocks);
+ }
+
+ END_CRIT_SECTION();
+ freespace = PageGetHeapFreeSpace(page);
+
+ UnlockReleaseBuffer(buffer);
+ RecordPageWithFreeSpace(heapRelation, item->blkno, freespace);
+ pgstat_update_heap_dead_tuples(heapRelation, nunusable);
+ }
+ /*
+ * ToDo: that we will do with TOAST relation?
+ */
+ }
+
+ relation_close(heapRelation, lockmode);
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+ {
+ SHTAB *tmp = res->items;
+ SHASH_Clean(tmp);
+ res->items = AuxiliaryList;
+ return tmp;
+ }
+}
+
+static bool
+DefaultCompareFunc(void* bucket1, void* bucket2)
+{
+ WorkerTask *item1 = (WorkerTask *) bucket1;
+ WorkerTask *item2 = (WorkerTask *) bucket2;
+
+ if (item1->blkno == item2->blkno)
+ return true;
+ else
+ return false;
+}
+
+/*
+ * Free any launcher resources before close process
+ */
+static void
+FreeLauncherInfo(int code, Datum arg)
+{
+ int counter;
+
+ for (counter = 0; counter < heapcleaner_max_workers+1; counter++)
+ SHASH_Destroy(wTab[counter]);
+ pfree(wTab);
+}
+
+/*
+ * Return a WorkerInfo to the free list
+ */
+static void
+FreeWorkerInfo(int code, Datum arg)
+{
+ if (MyWorkerInfo != NULL)
+ {
+ LWLockAcquire(HeapCleanerLock, LW_EXCLUSIVE);
+
+ dlist_delete(&MyWorkerInfo->links);
+ MyWorkerInfo->dbOid = InvalidOid;
+ MyWorkerInfo->launchtime = 0;
+ dlist_push_head(&HeapCleanerShmem->freeWorkers,
+ &MyWorkerInfo->links);
+ /* not mine anymore */
+ MyWorkerInfo = NULL;
+
+ LWLockRelease(HeapCleanerLock);
+ }
+ else
+ elog(ERROR, "---> MyWorkerInfo is NULL");
+}
+
+/*
+ * Initialize communication with backends
+ */
+void
+HeapCleanerInit()
+{
+ struct addrinfo *addrs = NULL,
+ *addr, hints;
+ ACCEPT_TYPE_ARG3 alen;
+ int ret;
+ fd_set rset;
+ struct timeval tv;
+ char test_byte;
+ int sel_res;
+ int tries = 0;
+
+#define TESTBYTEVAL ((char) 199)
+
+ /*
+ * Create the UDP socket for sending and receiving statistic messages
+ */
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_protocol = 0;
+ hints.ai_addrlen = 0;
+ hints.ai_addr = NULL;
+ hints.ai_canonname = NULL;
+ hints.ai_next = NULL;
+ ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
+ Assert(ret == 0);
+
+ for (addr = addrs; addr; addr = addr->ai_next)
+ {
+#ifdef HAVE_UNIX_SOCKETS
+ /* Ignore AF_UNIX sockets, if any are returned. */
+ if (addr->ai_family == AF_UNIX)
+ continue;
+#endif
+
+ if (++tries > 1)
+ ereport(LOG,
+ (errmsg("trying another address for the statistics collector")));
+
+ if ((HeapCleanerSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not create socket for relation cleaner: %m")));
+ continue;
+ }
+
+ if (bind(HeapCleanerSock, addr->ai_addr, addr->ai_addrlen) < 0)
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not bind socket for relation cleaner: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ alen = sizeof(HeapCleanerSockAddr);
+ if (getsockname(HeapCleanerSock, (struct sockaddr *) &HeapCleanerSockAddr, &alen) < 0)
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not get address of socket for statistics collector: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ if (connect(HeapCleanerSock, (struct sockaddr *) &HeapCleanerSockAddr, alen) < 0)
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not connect socket for statistics collector: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+ /*
+ * Try to send and receive a one-byte test message on the socket. This
+ * is to catch situations where the socket can be created but will not
+ * actually pass data (for instance, because kernel packet filtering
+ * rules prevent it).
+ */
+ test_byte = TESTBYTEVAL;
+
+retry1:
+ if (send(HeapCleanerSock, &test_byte, 1, 0) != 1)
+ {
+ if (errno == EINTR)
+ goto retry1; /* if interrupted, just retry */
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not send test message on socket for statistics collector: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ /*
+ * There could possibly be a little delay before the message can be
+ * received. We arbitrarily allow up to half a second before deciding
+ * it's broken.
+ */
+ for (;;) /* need a loop to handle EINTR */
+ {
+ FD_ZERO(&rset);
+ FD_SET(HeapCleanerSock, &rset);
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 500000;
+ sel_res = select(HeapCleanerSock + 1, &rset, NULL, NULL, &tv);
+ if (sel_res >= 0 || errno != EINTR)
+ break;
+ }
+ if (sel_res < 0)
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("select() failed in statistics collector: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+ if (sel_res == 0 || !FD_ISSET(HeapCleanerSock, &rset))
+ {
+ /*
+ * This is the case we actually think is likely, so take pains to
+ * give a specific message for it.
+ *
+ * errno will not be set meaningfully here, so don't use it.
+ */
+ ereport(LOG,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("test message did not get through on socket for statistics collector")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ test_byte++; /* just make sure variable is changed */
+
+retry2:
+ if (recv(HeapCleanerSock, &test_byte, 1, 0) != 1)
+ {
+ if (errno == EINTR)
+ goto retry2; /* if interrupted, just retry */
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not receive test message on socket for statistics collector: %m")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("incorrect test message transmission on socket for statistics collector")));
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+ continue;
+ }
+
+ /* If we get here, we have a working socket */
+
+ break;
+ }
+
+ /* Did we find a working address? */
+ if (!addr || HeapCleanerSock == PGINVALID_SOCKET)
+ goto startup_failed;
+
+ /*
+ * Set the socket to non-blocking IO. This ensures that if the collector
+ * falls behind, statistics messages will be discarded; backends won't
+ * block waiting to send messages to the collector.
+ */
+ if (!pg_set_noblock(HeapCleanerSock))
+ {
+ ereport(LOG,
+ (errcode_for_socket_access(),
+ errmsg("could not set statistics collector socket to nonblocking mode: %m")));
+ goto startup_failed;
+ }
+ pg_freeaddrinfo_all(hints.ai_family, addrs);
+
+ return;
+
+startup_failed:
+ ereport(LOG,
+ (errmsg("disabling heap cleaner for lack of working socket")));
+
+ if (addrs)
+ pg_freeaddrinfo_all(hints.ai_family, addrs);
+
+ if (HeapCleanerSock != PGINVALID_SOCKET)
+ closesocket(HeapCleanerSock);
+ HeapCleanerSock = PGINVALID_SOCKET;
+}
+
+/*
+ * Start and initialization logic of a launcher
+ */
+NON_EXEC_STATIC void
+HeapCleanerLauncherMain(int argc, char *argv[])
+{
+ sigjmp_buf local_sigjmp_buf;
+ MemoryContext bgheap_context;
+ int counter;
+
+ am_heapcleaner_launcher = true;
+
+ /*
+ * Identify myself via ps
+ */
+ init_ps_display(pgstat_get_backend_desc(B_BG_HEAPCLNR_LAUNCHER), "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ pqsignal(SIGHUP, SIGHUP_Handler); /* set flag to read config file */
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, SIGTERM_Handler);
+ pqsignal(SIGQUIT, quickdie); /* hard crash time */
+ InitializeTimeouts();
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ BaseInit();
+
+#ifndef EXEC_BACKEND
+ InitProcess();
+#endif
+
+ InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false);
+
+ SetProcessingMode(NormalProcessing);
+
+ /*
+ * Create a resource owner to keep track of our resources (currently only
+ * buffer pins).
+ */
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Background cleaner");
+
+ bgheap_context = AllocSetContextCreate(TopMemoryContext,
+ "Background cleaner",
+ ALLOCSET_DEFAULT_SIZES);
+ MemoryContextSwitchTo(bgheap_context);
+
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Forget any pending QueryCancel or timeout request */
+ disable_all_timeouts(false);
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ LWLockReleaseAll();
+ pgstat_report_wait_end();
+ AbortBufferIO();
+ UnlockBuffers();
+ if (CurrentResourceOwner)
+ /* buffer pins are released here: */
+ ResourceOwnerRelease(CurrentResourceOwner,
+ RESOURCE_RELEASE_BEFORE_LOCKS,
+ false, true);
+ /* we needn't bother with the other ResourceOwnerRelease phases */
+ AtEOXact_Buffers(false);
+ AtEOXact_SMgr();
+ AtEOXact_Files(false);
+ AtEOXact_HashTables(false);
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(bgheap_context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(bgheap_context);
+
+ /*
+ * Make sure pgstat also considers our stat data as gone. Note: we
+ * mustn't use autovac_refresh_stats here.
+ */
+ pgstat_clear_snapshot();
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ if (got_SIGTERM)
+ proc_exit(0);
+ /*
+ * Sleep at least 1 second after any error. A write error is likely
+ * to be repeated, and we don't want to be filling the error logs as
+ * fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ PG_SETMASK(&UnBlockSig);
+
+ wTabCtl.FillFactor = 0.75;
+ wTabCtl.ElementsMaxNum = WAITING_MESSAGES_MAX_NUM;
+ wTabCtl.ElementSize = sizeof(CleanerMessage);
+ wTabCtl.KeySize = 2 * sizeof(Oid) + sizeof(BlockNumber);
+ wTabCtl.HashFunc = CleanerMessageHashFunc;
+ wTabCtl.CompFunc = isEqualMsgs;
+
+ Assert(heapcleaner_max_workers == 10);
+ wTab = palloc0((heapcleaner_max_workers+1)*sizeof(PSHTAB));
+ for (counter = 0; counter < heapcleaner_max_workers+1; counter++)
+ wTab[counter] = SHASH_Create(wTabCtl);
+ on_shmem_exit(FreeLauncherInfo, 0);
+elog(LOG, "-> Launcher Started!");
+ main_launcher_loop();
+
+ proc_exit(0);
+}
+
+static bool
+backend_store_dirty_block(CleanerMessage *msg)
+{
+ bool found;
+ CleanerMessage *rec;
+
+ /* Initialize collector's hash table */
+ if (dblocks == NULL)
+ {
+ SHTABCTL ctl;
+
+ ctl.ElementsMaxNum = BACKEND_DIRTY_ITEMS_MAX;
+ ctl.FillFactor = 0.75;
+ ctl.ElementSize = sizeof(CleanerMessage);
+ ctl.KeySize = 2 * sizeof(Oid) + sizeof(BlockNumber);
+ ctl.HashFunc = CleanerMessageHashFunc;
+ ctl.CompFunc = isEqualMsgs;
+ dblocks = SHASH_Create(ctl);
+ }
+
+ /* Search for new position or for duplicates */
+ rec = (CleanerMessage *) SHASH_Search(dblocks, (void *) msg, SHASH_ENTER, &found);
+
+ if (!rec)
+ return true;
+
+ if (!found)
+ rec->hits = 0;
+
+ rec->xid = msg->xid;
+ rec->hits++;
+ return false;
+}
+
+/*
+ * Send a package of dirty blocks from a Backend to the Cleaner.
+ * Form the package by passing across the table. During this pass table is
+ * cleaned. Sign of clean row is (dbNode == 0) condition.
+ */
+static void
+backend_send_dirty_blocks(void)
+{
+ int rc;
+ CleanerMessage *rec;
+ CleanerMessage data[BACKEND_DIRTY_ITEMS_MAX];
+ int nitems = 0;
+
+ if (dblocks == NULL)
+ return;
+
+ if (SHASH_Entries(dblocks) == 0)
+ return;
+
+ Assert(SHASH_Entries(dblocks) <= BACKEND_DIRTY_ITEMS_MAX);
+
+ for (SHASH_SeqReset(dblocks);
+ (rec = SHASH_SeqNext(dblocks)) != NULL; )
+ {
+ Assert(rec->hits > 0);
+ Assert(nitems < BACKEND_DIRTY_ITEMS_MAX);
+ data[nitems++] = *rec;
+ }
+ SHASH_Clean(dblocks);
+
+ /* Send data to launcher.
+ * We'll retry after EINTR, but ignore all other failures
+ */
+ do
+ {
+ rc = send(HeapCleanerSock, data, nitems*sizeof(CleanerMessage), 0);
+ } while (rc < 0 && errno == EINTR);
+}
+
+/*
+ * Send a dirty block of relation to the Cleaner.
+ * Before shipping to the Cleaner dirty blocks collects in a simple hash
+ * table. Duplicates are discarded.
+ */
+void
+HeapCleanerSend(Relation relation, BlockNumber blkno)
+{
+ CleanerMessage msg;
+ MemoryContext oldMemCxt;
+
+ if (BGHeapMemCxt == NULL)
+ BGHeapMemCxt = AllocSetContextCreate(BGHeapMemCxt, "bgheap",
+ ALLOCSET_DEFAULT_SIZES);
+
+ if (RecoveryInProgress())
+ return;
+
+ if (HeapCleanerSock == PGINVALID_SOCKET)
+ return;
+
+ /*
+ * ToDo: Cleaning of system relations make instability in regression tests
+ * output order
+ */
+ if (IsSystemRelation(relation))
+ return;
+
+ /*
+ * Check the relation type.
+ * Not all relations can be cleaned now (similarly vacuum).
+ */
+ if (relation->rd_rel->relkind != RELKIND_RELATION &&
+ relation->rd_rel->relkind != RELKIND_MATVIEW &&
+ relation->rd_rel->relkind != RELKIND_TOASTVALUE &&
+ relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ {
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- cannot clean non-tables or special system tables",
+ RelationGetRelationName(relation))));
+ return;
+ }
+
+ if (RELATION_IS_OTHER_TEMP(relation))
+ return;
+
+ if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return;
+
+ oldMemCxt = MemoryContextSwitchTo(BGHeapMemCxt);
+
+ msg.dbNode = relation->rd_node.dbNode;
+ msg.relid = RelationGetRelid(relation);
+ msg.blkno = blkno;
+ msg.xid = /*ReadNewTransactionId();*/GetCurrentTransactionIdIfAny();
+
+ if (backend_store_dirty_block(&msg))
+ backend_send_dirty_blocks();
+
+ MemoryContextSwitchTo(oldMemCxt);
+}
+
+/*
+ * Start and initialization logic of a worker
+ */
+NON_EXEC_STATIC void
+HeapCleanerWorkerMain(int argc, char *argv[])
+{
+ sigjmp_buf local_sigjmp_buf;
+
+ am_heapcleaner_worker = true;
+
+ /* Identify myself via ps */
+ init_ps_display(pgstat_get_backend_desc(B_BG_HEAPCLNR_WORKER), "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ pqsignal(SIGHUP, SIGHUP_Handler);
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, SIGTERM_Handler);
+ pqsignal(SIGQUIT, quickdie);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
+
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIGUSR2_Handler); /* Signal: Buffer contains a message */
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /* Early initialization */
+ BaseInit();
+
+#ifndef EXEC_BACKEND
+ InitProcess();
+#endif
+
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ PG_SETMASK(&UnBlockSig);
+
+ /* Initialization steps */
+ LWLockAcquire(HeapCleanerLock, LW_EXCLUSIVE);
+ if (HeapCleanerShmem->startingWorker != NULL)
+ {
+ MyWorkerInfo = HeapCleanerShmem->startingWorker;
+ MyWorkerInfo->pid = getpid();
+ HeapCleanerShmem->startingWorker = NULL;
+
+ /* insert into the running list */
+ dlist_push_head(&HeapCleanerShmem->runningWorkers, &MyWorkerInfo->links);
+
+ on_shmem_exit(FreeWorkerInfo, 0);
+ }
+ else
+ elog(ERROR, "No Starting worker!");
+ LWLockRelease(HeapCleanerLock);
+
+ if (OidIsValid(MyWorkerInfo->dbOid))
+ {
+ char dbname[NAMEDATALEN];
+ SHTABCTL pr_ctl;
+
+ pgstat_report_heapcleaner(MyWorkerInfo->dbOid);
+ InitPostgres(NULL, MyWorkerInfo->dbOid, NULL, InvalidOid, dbname, false);
+ SetProcessingMode(NormalProcessing);
+ set_ps_display(dbname, false);
+
+ pr_ctl.ElementsMaxNum = 1000;
+ pr_ctl.FillFactor = 0.5;
+ pr_ctl.HashFunc = DefaultHashValueFunc;
+ pr_ctl.CompFunc = DefaultCompareFunc;
+ pr_ctl.ElementSize = sizeof(DirtyRelation);
+ pr_ctl.KeySize = sizeof(int32);
+
+ PrivateRelationsTable = SHASH_Create(pr_ctl);
+
+ /* Add tracking info to pgstat */
+ pgstat_progress_start_command(PROGRESS_COMMAND_CLEANER, MyWorkerInfo->dbOid);
+
+ main_worker_loop();
+ }
+ else
+ elog(ERROR, "dbid %d not valid!", MyWorkerInfo->dbOid);
+
+ proc_exit(0);
+}
+
+/*
+ * Compare messages
+ */
+static bool
+isEqualMsgs(void *arg1, void *arg2)
+{
+ CleanerMessage *msg1 = arg1;
+ CleanerMessage *msg2 = arg2;
+
+ Assert(arg1 != NULL);
+ Assert(arg2 != NULL);
+
+ if (msg1->blkno != msg2->blkno)
+ return false;
+ if (msg1->relid != msg2->relid)
+ return false;
+ if (msg1->dbNode != msg2->dbNode)
+ return false;
+
+ return true;
+}
+
+/*
+ * IsHeapCleaner functions
+ * Return whether this is either a launcher heap cleaner process or
+ * a worker process.
+ */
+bool
+IsHeapCleanerLauncherProcess(void)
+{
+ return am_heapcleaner_launcher;
+}
+
+bool
+IsHeapCleanerWorkerProcess(void)
+{
+ return am_heapcleaner_worker;
+}
+
+/*
+ * Send signal to Postmaster for launch new worker instance.
+ * Eject one node from freeWorkers list and assign to startingWorker
+ * HeapCleanerLock must be exclusive-locked
+ */
+static void
+launch_worker(Oid dbNode)
+{
+ WorkerInfo worker;
+ dlist_node *wptr;
+
+ if (dlist_is_empty(&HeapCleanerShmem->freeWorkers))
+ elog(ERROR, "NO a free slot for background cleaner worker!");
+
+ wptr = dlist_pop_head_node(&HeapCleanerShmem->freeWorkers);
+ worker = dlist_container(WorkerInfoData, links, wptr);
+ worker->dbOid = dbNode;
+ worker->launchtime = GetCurrentTimestamp();
+ worker->nitems = 0;
+ HeapCleanerShmem->startingWorker = worker;
+
+ SendPostmasterSignal(PMSIGNAL_START_HEAPCLNR_WORKER);
+}
+
+/*
+ * Return worker that initialized for dbNode database or NULL
+ * Caller need to acquire share lock on HeapCleanerLock
+ */
+static WorkerInfo
+look_for_worker(Oid dbNode)
+{
+ dlist_node *node;
+ WorkerInfo worker = NULL;
+
+ if (!dlist_is_empty(&HeapCleanerShmem->runningWorkers))
+ for (node = dlist_head_node(&HeapCleanerShmem->runningWorkers);
+ ;
+ node = dlist_next_node(&HeapCleanerShmem->runningWorkers, node))
+ {
+ if (((WorkerInfo)node)->dbOid == dbNode)
+ {
+ worker = (WorkerInfo) node;
+ break;
+ }
+ if (!dlist_has_next(&HeapCleanerShmem->runningWorkers, node))
+ break;
+ }
+
+ return worker;
+}
+
+/*
+ * Entry point of a launcher behavior logic
+ */
+static void
+main_launcher_loop()
+{
+ bool found;
+ long timeout = -1L;
+
+ Assert(HeapCleanerSock != PGINVALID_SOCKET);
+
+ /*
+ * Do work until not catch a SIGTERM signal and all blocks
+ * not shipped to workers.
+ * timeout <= 0 tells us that the launcher does not have any work right now,
+ * and it will sleep until the socket message or the system signal.
+ */
+ while (!got_SIGTERM || (timeout > 0))
+ {
+ int rc;
+ int len;
+ CleanerMessage *msg;
+ CleanerMessage table[BACKEND_DIRTY_ITEMS_MAX];
+ WorkerInfo startingWorker;
+ WorkerInfo worker;
+ dlist_node *node;
+
+ timeout = -1L;
+
+ /* Process system signals */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ ResetLatch(MyLatch);
+
+ /* At First, receive a message from backend */
+ len = recv(HeapCleanerSock, table, BACKEND_DIRTY_ITEMS_MAX*sizeof(CleanerMessage), 0);
+
+ LWLockAcquire(HeapCleanerLock, LW_EXCLUSIVE);
+ startingWorker = HeapCleanerShmem->startingWorker;
+
+ /*
+ * Message was received from socket.
+ */
+ if (len > 0)
+ {
+ CleanerMessage *mptr;
+
+ if (len%sizeof(CleanerMessage) != 0)
+ elog(ERROR, "INCORRECT Message size!");
+
+ /*
+ * Push all messages received from a backend to general
+ * hash table with waiting messages.
+ */
+ for (mptr = table; ((char *)mptr-(char *)table) < len; mptr++)
+ {
+ worker = look_for_worker(mptr->dbNode);
+ if (worker)
+ msg = (CleanerMessage *) SHASH_Search(wTab[worker->id], mptr, SHASH_ENTER, &found);
+ else
+ msg = (CleanerMessage *) SHASH_Search(wTab[heapcleaner_max_workers], mptr, SHASH_ENTER, &found);
+
+ if (msg == NULL)
+ /*
+ * Hash table is FULL and we can't insert a message
+ */
+ stat_missed_blocks++;
+ else if (found)
+ {
+ msg->hits++;
+ msg->xid = (msg->xid > mptr->xid) ? msg->xid : mptr->xid;
+ }
+ else
+ memcpy(msg, mptr, sizeof(CleanerMessage));
+ }
+ }
+
+ /*
+ * Main waiting list parsing.
+ */
+ if (SHASH_Entries(wTab[heapcleaner_max_workers]) > 0)
+ {
+ bool startWorker = (startingWorker != NULL);
+
+ /*
+ * Pass across general waiting list.
+ * Try to send tasks to active workers.
+ */
+ for (SHASH_SeqReset(wTab[heapcleaner_max_workers]);
+ (msg = (CleanerMessage *) SHASH_SeqNext(wTab[heapcleaner_max_workers])) != NULL; )
+ {
+ worker = look_for_worker(msg->dbNode);
+
+ if (worker)
+ {
+ CleanerMessage *temp_msg = (CleanerMessage *) SHASH_Search(wTab[worker->id], (void *) msg, SHASH_ENTER, &found);
+ if (temp_msg != NULL)
+ if (found)
+ {
+ temp_msg->hits += msg->hits;
+ temp_msg->xid = (msg->xid > temp_msg->xid) ? msg->xid : temp_msg->xid;
+ }
+ else
+ memcpy(temp_msg, msg, sizeof(CleanerMessage));
+ else
+ stat_missed_blocks++;
+
+ /*
+ * Message gone to worker. delete from main waiting list.
+ */
+ temp_msg = (CleanerMessage *) SHASH_Search(wTab[heapcleaner_max_workers], (void *) msg, SHASH_REMOVE, NULL);
+ Assert(temp_msg != NULL);
+
+ continue;
+ }
+ else if (!startWorker)
+ {
+ /* Start new worker */
+ launch_worker(msg->dbNode);
+ startWorker = true;
+ }
+ }
+ }
+
+ /*
+ * Check: if we can't process all incoming messages, we need too small
+ * timeout, check latches and go to next iteration.
+ */
+ if (SHASH_Entries(wTab[heapcleaner_max_workers]) > 0)
+ timeout = 1L;
+
+ /*
+ * See waiting lists of active workers and try to send messages.
+ */
+ if (!dlist_is_empty(&HeapCleanerShmem->runningWorkers))
+ {
+ for (node = dlist_head_node(&HeapCleanerShmem->runningWorkers);
+ ;
+ node = dlist_next_node(&HeapCleanerShmem->runningWorkers, node))
+ {
+ worker = (WorkerInfo) node;
+
+ if (SHASH_Entries(wTab[worker->id]) == 0)
+ {
+ /* Check worker idle time */
+ if (TimestampDifferenceExceeds(worker->launchtime, GetCurrentTimestamp(), WORKER_IDLE_TIME_DURATION_MAX))
+ {
+ LWLockAcquire(&worker->WorkItemLock, LW_EXCLUSIVE);
+
+ /*
+ * Check: may be worker has tasks but is too lazy
+ */
+ if (worker->nitems == 0)
+ {
+ /* Shutdown the idle worker */
+ kill(worker->pid, SIGTERM);
+ dlist_delete(&worker->links);
+ dlist_push_head(&HeapCleanerShmem->freeWorkers, &worker->links);
+
+ if (!dlist_is_empty(&HeapCleanerShmem->runningWorkers))
+ node = dlist_head_node(&HeapCleanerShmem->runningWorkers);
+ else
+ {
+ LWLockRelease(&worker->WorkItemLock);
+ break;
+ }
+ }
+ LWLockRelease(&worker->WorkItemLock);
+ }
+ else if (!dlist_has_next(&HeapCleanerShmem->runningWorkers, node))
+ break;
+ else
+ continue;
+ }
+
+ /* Put list of potentially dirty blocks to the worker shared buffer */
+ LWLockAcquire(&worker->WorkItemLock, LW_EXCLUSIVE);
+
+ SHASH_SeqReset(wTab[worker->id]);
+ while (((msg = (CleanerMessage *) SHASH_SeqNext(wTab[worker->id])) != NULL) &&
+ (worker->nitems < WORKER_TASK_ITEMS_MAX))
+ {
+ void *temp_msg;
+
+ memcpy(&worker->buffer[worker->nitems++], msg, sizeof(CleanerMessage));
+ temp_msg = SHASH_Search(wTab[worker->id], (void *) msg, SHASH_REMOVE, NULL);
+ Assert(temp_msg != NULL);
+ Assert(worker->buffer[worker->nitems-1].hits > 0);
+ }
+
+ worker->launchtime = GetCurrentTimestamp();
+ LWLockRelease(&worker->WorkItemLock);
+
+ /* Worker, you have a task! */
+ kill(worker->pid, SIGUSR2);
+
+ /*
+ * If launcher has any tasks, It check signals quickly
+ * and to work on further.
+ */
+ if (SHASH_Entries(wTab[worker->id]) > 0)
+ timeout = 1L;
+
+ if (!dlist_has_next(&HeapCleanerShmem->runningWorkers, node))
+ break;
+ }
+
+ /*
+ * Launcher have'nt any immediate tasks and
+ * need to manage idle workers only
+ */
+ if (timeout < 0)
+ /* We only need to wait idle workers */
+ timeout = 100L;
+ }
+ LWLockRelease(HeapCleanerLock);
+
+ if (!got_SIGTERM)
+ {
+ int wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE;
+
+ if (timeout > 0)
+ wakeEvents |= WL_TIMEOUT;
+ /* Wait data or signals */
+ rc = WaitLatchOrSocket(MyLatch,
+ wakeEvents,
+ HeapCleanerSock, timeout,
+ WAIT_EVENT_BGHEAP_MAIN);
+ }
+
+ /* Emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ {
+ elog(LOG, "Heap Launcher exit with 1");
+ proc_exit(1);
+ }
+ }
+
+ elog(LOG, "Heap Launcher exit with 0");
+ proc_exit(0);
+}
+
+/*
+ * Entry point of a worker behavior logic
+ */
+static void
+main_worker_loop(void)
+{
+ CleanerMessage task_item[WORKER_TASK_ITEMS_MAX];
+ DirtyRelation *dirty_relation[WORKER_RELATIONS_MAX_NUM];
+ PSHTAB FreeDirtyBlocksList;
+ long timeout = -1L;
+ int dirty_relations_num = 0;
+ SHTABCTL shctl;
+
+ shctl.FillFactor = 0.75;
+ shctl.ElementsMaxNum = WORKER_DIRTYBLOCKS_MAX_NUM;
+ shctl.ElementSize = sizeof(WorkerTask);
+ shctl.KeySize = sizeof(BlockNumber);
+ shctl.HashFunc = DefaultHashValueFunc;
+ shctl.CompFunc = DefaultCompareFunc;
+
+ FreeDirtyBlocksList = SHASH_Create(shctl);
+
+ while (!got_SIGTERM || (timeout > 0))
+ {
+ int rc;
+ int incoming_items_num = 0;
+
+ timeout = -1L;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (got_SIGUSR2)
+ /* It is needed only for wakeup worker */
+ got_SIGUSR2 = false;
+
+ /*
+ * Move task items from shared buffer to local and release it for
+ * new data.
+ * It is introduced in accordance with the idea that shared memory
+ * buffer is smaller than internal process buffer.
+ */
+ LWLockAcquire(&MyWorkerInfo->WorkItemLock, LW_SHARED);
+ if (MyWorkerInfo->nitems > 0)
+ {
+ incoming_items_num = MyWorkerInfo->nitems;
+ memcpy(task_item, MyWorkerInfo->buffer, incoming_items_num*sizeof(CleanerMessage));
+ MyWorkerInfo->nitems = 0;
+ }
+ LWLockRelease(&MyWorkerInfo->WorkItemLock);
+
+ /*
+ * If launcher receive some task items it need to distribute between
+ * workers and waiting lists.
+ */
+ if (incoming_items_num > 0)
+ {
+ DirtyRelation *hashent;
+ bool found;
+ int i;
+
+ /* */
+ timeout = 20L;
+
+ pgstat_progress_update_param(PROGRESS_CLEANER_MISSED_BLOCKS, stat_missed_blocks);
+
+ /* Pass across items and sort by relation */
+ for (i = 0; i < incoming_items_num; i++)
+ {
+ WorkerTask *item;
+
+ hashent = SHASH_Search(PrivateRelationsTable,
+ (void *) &(task_item[i].relid), HASH_FIND, NULL);
+
+ if (hashent == NULL)
+ {
+ elog(LOG, "Add new relation: %d", task_item[i].relid);
+ if (dirty_relations_num == WORKER_RELATIONS_MAX_NUM)
+ {
+ int j;
+ int min = SHASH_Entries(dirty_relation[0]->items);
+ int pos = 0;
+
+ /*
+ * All slots for dirty relations are busy.
+ * Search for slot with smaller number of dirty blocks
+ * and replace it.
+ */
+ elog(LOG, "All dirty relation slots are busy at worker process. Start min=%d", min);
+ for (j = 1; j < dirty_relations_num; j++)
+ {
+ int nitems = SHASH_Entries(dirty_relation[j]->items);
+ if (nitems < min)
+ {
+ min = nitems;
+ pos = j;
+ }
+ }
+
+ /* Save stats about not cleaned blocks */
+ stat_missed_blocks += SHASH_Entries(dirty_relation[pos]->items);
+
+ elog(LOG, "Remove slot %d with id=%d nitems=%lu. Set new relation id=%d",pos, dirty_relation[pos]->relid, SHASH_Entries(dirty_relation[pos]->items), task_item[i].relid);
+ hashent = SHASH_Search(PrivateRelationsTable,
+ (void *) &(dirty_relation[pos]->relid), HASH_REMOVE, &found);
+
+ Assert(found);
+ Assert(hashent != NULL);
+ SHASH_Destroy(hashent->items);
+
+ /*
+ * Fill empty position by data from last position
+ */
+ dirty_relation[pos] = dirty_relation[--dirty_relations_num];
+ }
+
+ /* Insert new relid to hash table */
+ hashent = SHASH_Search(PrivateRelationsTable, (void *) &(task_item[i].relid), HASH_ENTER, &found);
+ Assert(!found);
+ Assert(hashent != NULL);
+
+ /*
+ * At a new entry we create and init list of 'dirty' blocks
+ */
+ hashent->items = SHASH_Create(shctl);
+ dirty_relation[dirty_relations_num++] = hashent;
+ pgstat_progress_update_param(PROGRESS_CLEANER_RELATIONS, SHASH_Entries(PrivateRelationsTable));
+ }
+
+ /*
+ * Relation entry found or create.
+ * Now, add an item to a waiting list.
+ */
+ item = (WorkerTask *) SHASH_Search(hashent->items, (void *) &(task_item[i].blkno), HASH_ENTER, &found);
+ if (item == NULL)
+ stat_missed_blocks++;
+ else
+ {
+ if (!found)
+ {
+ item->hits = 0;
+ item->lastXid = InvalidTransactionId;
+ }
+
+ item->hits += task_item[i].hits;
+ Assert(item->hits > 0);
+ item->lastXid = (item->lastXid > task_item[i].xid) ? item->lastXid : task_item[i].xid;
+ }
+ }
+ incoming_items_num = 0;
+ }
+
+ PG_TRY();
+ {
+ int relcounter;
+ int64 stat_tot_wait_queue_len = 0;
+
+ stat_buf_ninmem = 0;
+
+ /* Pass along dirty relations and try to clean it */
+ for (relcounter = 0; relcounter < dirty_relations_num; relcounter++)
+ {
+ if (SHASH_Entries(FreeDirtyBlocksList) > 0)
+ SHASH_Clean(FreeDirtyBlocksList);
+
+ FreeDirtyBlocksList = cleanup_relations(dirty_relation[relcounter], FreeDirtyBlocksList, got_SIGTERM);
+
+ /*
+ * Some blocks from the list may blocked by a backend.
+ * Deferred its for next cleanup attempt.
+ */
+ if ((stat_tot_wait_queue_len += SHASH_Entries(dirty_relation[relcounter]->items)) > 0)
+ timeout = 5L;
+ }
+
+ pgstat_progress_update_param(PROGRESS_CLEANER_TIMEOUT, timeout);
+ pgstat_progress_update_param(PROGRESS_CLEANER_BUF_NINMEM, stat_buf_ninmem);
+ pgstat_progress_update_param(PROGRESS_CLEANER_TOTAL_QUEUE_LENGTH, stat_tot_wait_queue_len);
+ QueryCancelPending = false;
+ }
+ PG_CATCH();
+ {
+ HOLD_INTERRUPTS();
+ EmitErrorReport();
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+ }
+ PG_END_TRY();
+
+ if (!got_SIGTERM)
+ {
+ int wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+ if (timeout > 0)
+ wakeEvents |= WL_TIMEOUT;
+ rc = WaitLatch(MyLatch, wakeEvents, timeout, WAIT_EVENT_BGHEAP_MAIN);
+ }
+
+ ResetLatch(MyLatch);
+
+ /* Emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
+
+ pgstat_progress_end_command();
+ proc_exit(0);
+}
+
+/*
+ * SIGHUP_Handler
+ */
+static void
+SIGHUP_Handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * SIGTERM_Handler
+ */
+static void
+SIGTERM_Handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGTERM = true;
+
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * SIGUSR2_Handler
+ * Wake up a worker to read some messages from launcher
+ */
+static void
+SIGUSR2_Handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGUSR2 = true;
+
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * Main entry point for background heap cleaner (launcher) process, to be called from the
+ * postmaster.
+ */
+int
+StartHeapCleanerLauncher(void)
+{
+ pid_t HeapCleanerLauncherPID;
+
+#ifdef EXEC_BACKEND
+ switch ((HeapCleanerLauncherPID = bghclauncher_forkexec()))
+#else
+ switch ((HeapCleanerLauncherPID = fork_process()))
+#endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork background heap cleaner (launcher) process: %m")));
+ return 0;
+
+#ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ HeapCleanerLauncherMain(0, NULL);
+ break;
+#endif
+ default:
+ return (int) HeapCleanerLauncherPID;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+/*
+ * Main entry point for background heap cleaner (worker) process, to be called from the
+ * postmaster.
+ */
+int
+StartHeapCleanerWorker(void)
+{
+ pid_t worker_pid;
+
+#ifdef EXEC_BACKEND
+ switch ((worker_pid = hcworker_forkexec()))
+#else
+ switch ((worker_pid = fork_process()))
+#endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork background heap cleaner (worker) process: %m")));
+ return 0;
+
+#ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ HeapCleanerWorkerMain(0, NULL);
+ break;
+#endif
+ default:
+ return (int) worker_pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+/*
+ * HeapCleanerShmemInit
+ * Allocate and initialize heapcleaner-related shared memory
+ */
+void
+HeapCleanerShmemInit(void)
+{
+ bool found;
+
+ HeapCleanerShmem = (HeapCleanerShmemStruct *)
+ ShmemInitStruct("HeapCleaner Data",
+ HeapCleanerShmemSize(),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ WorkerInfo worker;
+ int i;
+
+ Assert(!found);
+
+ dlist_init(&HeapCleanerShmem->freeWorkers);
+ dlist_init(&HeapCleanerShmem->runningWorkers);
+
+ worker = (WorkerInfo) ((char *) HeapCleanerShmem +
+ MAXALIGN(sizeof(HeapCleanerShmemStruct)));
+
+ /* initialize the WorkerInfo free list */
+ for (i = 0; i < heapcleaner_max_workers; i++)
+ {
+ worker[i].id = i;
+ LWLockInitialize(&worker[i].WorkItemLock, TrancheId);
+ dlist_push_head(&HeapCleanerShmem->freeWorkers,
+ &worker[i].links);
+ }
+ }
+ else
+ Assert(found);
+ HeapCleanerShmem->startingWorker = NULL;
+
+ TrancheId = LWLockNewTrancheId();
+ LWLockRegisterTranche(TrancheId, "heapcleaner");
+}
+
+/*
+ * HeapCleanerShmemSiz
+ * Compute space needed for heap cleaner-related shared memory
+ */
+Size
+HeapCleanerShmemSize(void)
+{
+ Size size;
+
+ /*
+ * Need the fixed struct and the array of WorkerInfoData.
+ */
+ size = sizeof(HeapCleanerShmemStruct);
+ size = MAXALIGN(size);
+ size = add_size(size, mul_size(heapcleaner_max_workers,
+ sizeof(WorkerInfoData)));
+ return size;
+}
+
+static uint64
+CleanerMessageHashFunc(void *key, uint64 size, uint64 base)
+{
+ uint64 sum = 0;
+ CleanerMessage *msg = (CleanerMessage *) key;
+
+ Assert(key != NULL);
+ Assert(size == 2 * sizeof(Oid) + sizeof(BlockNumber));
+ Assert(base > 0);
+
+ sum += DefaultHashValueFunc(&msg->dbNode, sizeof(Oid), base);
+ sum += DefaultHashValueFunc(&msg->relid, sizeof(Oid), base);
+ sum += DefaultHashValueFunc(&msg->blkno, sizeof(BlockNumber), base);
+
+ return sum%base;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index bbe73618c7..f992307e37 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -48,6 +48,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
@@ -1394,6 +1395,20 @@ pgstat_report_autovac(Oid dboid)
pgstat_send(&msg, sizeof(msg));
}
+void
+pgstat_report_heapcleaner(Oid dboid)
+{
+ PgStat_MsgAutovacStart msg;
+
+ if (pgStatSock == PGINVALID_SOCKET)
+ return;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BGHEAP_START);
+ msg.m_databaseid = dboid;
+ msg.m_start_time = GetCurrentTimestamp();
+
+ pgstat_send(&msg, sizeof(msg));
+}
/* ---------
* pgstat_report_vacuum() -
@@ -2833,6 +2848,14 @@ pgstat_bestart(void)
/* Autovacuum Worker */
beentry->st_backendType = B_AUTOVAC_WORKER;
}
+ else if (IsHeapCleanerLauncherProcess())
+ {
+ beentry->st_backendType = B_BG_HEAPCLNR_LAUNCHER;
+ }
+ else if (IsHeapCleanerWorkerProcess())
+ {
+ beentry->st_backendType = B_BG_HEAPCLNR_WORKER;
+ }
else if (am_walsender)
{
/* Wal sender */
@@ -3483,6 +3506,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
case WAIT_EVENT_AUTOVACUUM_MAIN:
event_name = "AutoVacuumMain";
break;
+ case WAIT_EVENT_BGHEAP_MAIN:
+ event_name = "BgHeapMain";
+ break;
case WAIT_EVENT_BGWRITER_HIBERNATE:
event_name = "BgWriterHibernate";
break;
@@ -4111,6 +4137,12 @@ pgstat_get_backend_desc(BackendType backendType)
case B_AUTOVAC_WORKER:
backendDesc = "autovacuum worker";
break;
+ case B_BG_HEAPCLNR_LAUNCHER:
+ backendDesc = "heap cleaner launcher";
+ break;
+ case B_BG_HEAPCLNR_WORKER:
+ backendDesc = "heap cleaner worker";
+ break;
case B_BACKEND:
backendDesc = "client backend";
break;
@@ -4422,6 +4454,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
break;
+ case PGSTAT_MTYPE_BGHEAP_START:
+// pgstat_recv_autovac((PgStat_MsgHeapCleanerStart *) &msg, len);
+ break;
+
case PGSTAT_MTYPE_VACUUM:
pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
break;
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b33cd..d14c1b557d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -109,6 +109,7 @@
#include "pgstat.h"
#include "port/pg_bswap.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
@@ -145,9 +146,10 @@
#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */
#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */
#define BACKEND_TYPE_BGWORKER 0x0008 /* bgworker process */
-#define BACKEND_TYPE_ALL 0x000F /* OR of all the above */
+#define BACKEND_TYPE_BGHEAP 0x0010 /* autovacuum worker process */
+#define BACKEND_TYPE_ALL 0x001F /* OR of all the above */
-#define BACKEND_TYPE_WORKER (BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER)
+#define BACKEND_TYPE_WORKER (BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER | BACKEND_TYPE_BGHEAP)
/*
* List of active backends (or child processes anyway; we don't actually
@@ -252,6 +254,7 @@ static pid_t StartupPID = 0,
WalWriterPID = 0,
WalReceiverPID = 0,
AutoVacPID = 0,
+ HeapClnrPID = 0,
PgArchPID = 0,
PgStatPID = 0,
SysLoggerPID = 0;
@@ -355,6 +358,7 @@ bool redirection_done = false; /* stderr redirected for syslogger? */
/* received START_AUTOVAC_LAUNCHER signal */
static volatile sig_atomic_t start_autovac_launcher = false;
+static volatile sig_atomic_t start_heapclnr_launcher = false;
/* the launcher needs to be signalled to communicate some condition */
static volatile bool avlauncher_needs_signal = false;
@@ -432,6 +436,7 @@ static void maybe_start_bgworkers(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static pid_t StartChildProcess(AuxProcType type);
static void StartAutovacuumWorker(void);
+static void StartBgHeapWorker(void);
static void MaybeStartWalReceiver(void);
static void InitPostmasterDeathWatchHandle(void);
@@ -1310,6 +1315,7 @@ PostmasterMain(int argc, char *argv[])
*/
autovac_init();
+ HeapCleanerInit();
/*
* Load configuration files for client authentication.
*/
@@ -1757,6 +1763,14 @@ ServerLoop(void)
start_autovac_launcher = false; /* signal processed */
}
+ if (!IsBinaryUpgrade && HeapClnrPID == 0 && start_heapclnr_launcher &&
+ pmState == PM_RUN)
+ {
+ HeapClnrPID = StartHeapCleanerLauncher();
+ if (HeapClnrPID != 0)
+ start_heapclnr_launcher = false; /* signal processed */
+ }
+
/* If we have lost the stats collector, try to start a new one */
if (PgStatPID == 0 &&
(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
@@ -1774,6 +1788,12 @@ ServerLoop(void)
kill(AutoVacPID, SIGUSR2);
}
+ /* If we have lost the background heap cleaner, try to start a new one. */
+ if (!IsBinaryUpgrade && HeapClnrPID == 0 && pmState == PM_RUN )
+ {
+ HeapClnrPID = StartHeapCleanerLauncher();
+ }
+
/* If we need to start a WAL receiver, try to do that now */
if (WalReceiverRequested)
MaybeStartWalReceiver();
@@ -2541,6 +2561,8 @@ SIGHUP_handler(SIGNAL_ARGS)
signal_child(WalReceiverPID, SIGHUP);
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGHUP);
+ if (HeapClnrPID != 0)
+ signal_child(HeapClnrPID, SIGHUP);
if (PgArchPID != 0)
signal_child(PgArchPID, SIGHUP);
if (SysLoggerPID != 0)
@@ -2627,10 +2649,12 @@ pmdie(SIGNAL_ARGS)
/* autovac workers are told to shut down immediately */
/* and bgworkers too; does this need tweaking? */
SignalSomeChildren(SIGTERM,
- BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER);
+ BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER | BACKEND_TYPE_BGHEAP);
/* and the autovac launcher too */
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGTERM);
+ if (HeapClnrPID != 0)
+ signal_child(HeapClnrPID, SIGTERM);
/* and the bgwriter too */
if (BgWriterPID != 0)
signal_child(BgWriterPID, SIGTERM);
@@ -2687,7 +2711,7 @@ pmdie(SIGNAL_ARGS)
signal_child(WalReceiverPID, SIGTERM);
if (pmState == PM_RECOVERY)
{
- SignalSomeChildren(SIGTERM, BACKEND_TYPE_BGWORKER);
+ SignalSomeChildren(SIGTERM, BACKEND_TYPE_BGWORKER | BACKEND_TYPE_BGHEAP);
/*
* Only startup, bgwriter, walreceiver, possibly bgworkers,
@@ -2708,10 +2732,12 @@ pmdie(SIGNAL_ARGS)
/* shut down all backends and workers */
SignalSomeChildren(SIGTERM,
BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC |
- BACKEND_TYPE_BGWORKER);
+ BACKEND_TYPE_BGWORKER | BACKEND_TYPE_BGHEAP);
/* and the autovac launcher too */
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGTERM);
+ if (HeapClnrPID != 0)
+ signal_child(HeapClnrPID, SIGTERM);
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
@@ -2875,6 +2901,8 @@ reaper(SIGNAL_ARGS)
*/
if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0)
AutoVacPID = StartAutoVacLauncher();
+ if (!IsBinaryUpgrade && (HeapClnrPID == 0))
+ HeapClnrPID = StartHeapCleanerLauncher();
if (PgArchStartupAllowed() && PgArchPID == 0)
PgArchPID = pgarch_start();
if (PgStatPID == 0)
@@ -3009,6 +3037,14 @@ reaper(SIGNAL_ARGS)
_("autovacuum launcher process"));
continue;
}
+ if (pid == HeapClnrPID)
+ {
+ HeapClnrPID = 0;
+ if (!EXIT_STATUS_0(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("heap cleaner launcher process"));
+ continue;
+ }
/*
* Was it the archiver? If so, just try to start a new one; no need
@@ -3475,6 +3511,17 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
signal_child(AutoVacPID, (SendStop ? SIGSTOP : SIGQUIT));
}
+ if (pid == HeapClnrPID)
+ HeapClnrPID = 0;
+ else if (HeapClnrPID != 0 && take_action)
+ {
+ ereport(DEBUG2,
+ (errmsg_internal("sending %s to process %d",
+ "SIGQUIT",
+ (int) HeapClnrPID)));
+ signal_child(HeapClnrPID, SIGQUIT);
+ }
+
/*
* Force a power-cycle of the pgarch process too. (This isn't absolutely
* necessary, but it seems like a good idea for robustness, and it
@@ -3661,6 +3708,7 @@ PostmasterStateMachine(void)
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
WalWriterPID == 0 &&
+ HeapClnrPID == 0 &&
AutoVacPID == 0)
{
if (Shutdown >= ImmediateShutdown || FatalError)
@@ -3759,6 +3807,7 @@ PostmasterStateMachine(void)
Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0);
+ Assert(HeapClnrPID == 0);
/* syslogger is not considered here */
pmState = PM_NO_CHILDREN;
}
@@ -3948,6 +3997,8 @@ TerminateChildren(int signal)
signal_child(WalReceiverPID, signal);
if (AutoVacPID != 0)
signal_child(AutoVacPID, signal);
+ if (HeapClnrPID != 0)
+ signal_child(HeapClnrPID, signal);
if (PgArchPID != 0)
signal_child(PgArchPID, signal);
if (PgStatPID != 0)
@@ -4781,6 +4832,7 @@ SubPostmasterMain(int argc, char *argv[])
if (strcmp(argv[1], "--forkbackend") == 0 ||
strcmp(argv[1], "--forkavlauncher") == 0 ||
strcmp(argv[1], "--forkavworker") == 0 ||
+ strcmp(argv[1], "--forkbgheap") == 0 ||
strcmp(argv[1], "--forkboot") == 0 ||
strncmp(argv[1], "--forkbgworker=", 15) == 0)
PGSharedMemoryReAttach();
@@ -4792,6 +4844,8 @@ SubPostmasterMain(int argc, char *argv[])
AutovacuumLauncherIAm();
if (strcmp(argv[1], "--forkavworker") == 0)
AutovacuumWorkerIAm();
+ if (strcmp(argv[1], "--forkbgheap") == 0)
+ BgHeapCleanerIAm();
/*
* Start our win32 signal implementation. This has to be done after we
@@ -4921,6 +4975,19 @@ SubPostmasterMain(int argc, char *argv[])
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
}
+ if (strcmp(argv[1], "--forkbgheap") == 0)
+ {
+ /* Restore basic shared memory pointers */
+ InitShmemAccess(UsedShmemSegAddr);
+
+ /* Need a PGPROC to run CreateSharedMemoryAndSemaphores */
+ InitProcess();
+
+ /* Attach process to shared data structures */
+ CreateSharedMemoryAndSemaphores(false, 0);
+
+ BgHeapMain(argc, argv); /* does not return */
+ }
if (strncmp(argv[1], "--forkbgworker=", 15) == 0)
{
int shmem_slot;
@@ -5122,6 +5189,12 @@ sigusr1_handler(SIGNAL_ARGS)
start_autovac_launcher = true;
}
+ if (CheckPostmasterSignal(PMSIGNAL_START_HEAPCLNR_LAUNCHER) &&
+ Shutdown == NoShutdown)
+ {
+ start_heapclnr_launcher = true;
+ }
+
if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER) &&
Shutdown == NoShutdown)
{
@@ -5129,6 +5202,13 @@ sigusr1_handler(SIGNAL_ARGS)
StartAutovacuumWorker();
}
+ if (CheckPostmasterSignal(PMSIGNAL_START_HEAPCLNR_WORKER) &&
+ Shutdown == NoShutdown)
+ {
+ /* The autovacuum launcher wants us to start a worker process. */
+ StartBgHeapWorker();
+ }
+
if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER))
{
/* Startup Process wants us to start the walreceiver process. */
@@ -5470,6 +5550,73 @@ StartAutovacuumWorker(void)
}
}
+static void
+StartBgHeapWorker(void)
+{
+ Backend *bn;
+
+ /*
+ * If not in condition to run a process, don't try, but handle it like a
+ * fork failure. This does not normally happen, since the signal is only
+ * supposed to be sent by autovacuum launcher when it's OK to do it, but
+ * we have to check to avoid race-condition problems during DB state
+ * changes.
+ */
+ if (canAcceptConnections() == CAC_OK)
+ {
+ /*
+ * Compute the cancel key that will be assigned to this session. We
+ * probably don't need cancel keys for autovac workers, but we'd
+ * better have something random in the field to prevent unfriendly
+ * people from sending cancels to them.
+ */
+ if (!RandomCancelKey(&MyCancelKey))
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("could not generate random cancel key")));
+ return;
+ }
+
+ bn = (Backend *) malloc(sizeof(Backend));
+ if (bn)
+ {
+ bn->cancel_key = MyCancelKey;
+
+ /* Autovac workers are not dead_end and need a child slot */
+ bn->dead_end = false;
+ bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
+ bn->bgworker_notify = false;
+
+ bn->pid = StartHeapCleanerWorker();
+
+ if (bn->pid > 0)
+ {
+ bn->bkend_type = BACKEND_TYPE_BGHEAP;
+ dlist_push_head(&BackendList, &bn->elem);
+#ifdef EXEC_BACKEND
+ ShmemBackendArrayAdd(bn);
+#endif
+ /* all OK */
+ return;
+ }
+
+ /*
+ * fork failed, fall through to report -- actual error message was
+ * logged by StartAutoVacWorker
+ */
+ (void) ReleasePostmasterChildSlot(bn->child_slot);
+ free(bn);
+ }
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ elog(ERROR, "Some BGHEAP Worker start error!");
+}
+
/*
* MaybeStartWalReceiver
* Start the WAL receiver process, if not running and our state allows.
@@ -5534,7 +5681,8 @@ CreateOptsFile(int argc, char *argv[], char *fullprogname)
int
MaxLivePostmasterChildren(void)
{
- return 2 * (MaxConnections + autovacuum_max_workers + 1 +
+ return 2 * (MaxConnections + autovacuum_max_workers +
+ heapcleaner_max_workers + 1 +
max_worker_processes);
}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 01eabe5706..175902e0d3 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -449,6 +449,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
ForkNumber forkNum,
BlockNumber blockNum,
BufferAccessStrategy strategy,
+ bool onlyInMemory,
bool *foundPtr);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
static void AtProcExit_Buffers(int code, Datum arg);
@@ -717,6 +718,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
isExtend = (blockNum == P_NEW);
+ Assert(!(isExtend && mode == RBM_NORMAL_NO_READ));
+
TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
smgr->smgr_rnode.node.spcNode,
smgr->smgr_rnode.node.dbNode,
@@ -743,9 +746,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* not currently in memory.
*/
bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
- strategy, &found);
+ strategy, (mode == RBM_NORMAL_NO_READ), &found);
if (found)
pgBufferUsage.shared_blks_hit++;
+ else if (mode == RBM_NORMAL_NO_READ)
+ return InvalidBuffer;
else
pgBufferUsage.shared_blks_read++;
}
@@ -788,6 +793,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
return BufferDescriptorGetBuffer(bufHdr);
}
+ Assert(mode != RBM_NORMAL_NO_READ);
+
/*
* We get here only in the corner case where we are trying to extend
* the relation but we found a pre-existing buffer marked BM_VALID.
@@ -987,6 +994,7 @@ static BufferDesc *
BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BlockNumber blockNum,
BufferAccessStrategy strategy,
+ bool onlyInMemory,
bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
@@ -1036,7 +1044,17 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* own read attempt if the page is still not BM_VALID.
* StartBufferIO does it all.
*/
- if (StartBufferIO(buf, true))
+ if (onlyInMemory == true)
+ {
+ /*
+ * We cannot get buffer immediately, go away if caller just
+ * tried to get access to already in-memory page
+ */
+ UnpinBuffer(buf, true);
+ *foundPtr = false;
+ return NULL;
+ }
+ else if (StartBufferIO(buf, true))
{
/*
* If we get here, previous attempts to read the buffer must
@@ -1055,6 +1073,13 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
LWLockRelease(newPartitionLock);
+ /* will not read a page from disk */
+ if (onlyInMemory == true)
+ {
+ *foundPtr = false;
+ return NULL;
+ }
+
/* Loop here in case we have to try another victim buffer */
for (;;)
{
@@ -1496,6 +1521,37 @@ MarkBufferDirty(Buffer buffer)
}
}
+/*
+ * IsBufferDirty - is buffer marked dirty?
+ *
+ * Buffer should be pinned, if caller wishes to get consistent result,
+ * buffer should be exclusively locked.
+ */
+bool
+IsBufferDirty(Buffer buffer)
+{
+ BufferDesc *bufHdr;
+ uint32 buf_state;
+
+ if (!BufferIsValid(buffer))
+ elog(ERROR, "bad buffer ID: %d", buffer);
+
+ Assert(BufferIsPinned(buffer));
+
+ if (BufferIsLocal(buffer))
+ return IsLocalBufferDirty(buffer);
+
+ bufHdr = GetBufferDescriptor(buffer - 1);
+
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+ /*
+ * if buffer is not exclusively locked, buf_state could be out of sync
+ * already...
+ */
+ return (buf_state & BM_DIRTY) ? true : false;
+}
+
/*
* ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
*
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index e4146a260a..856955635e 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -305,6 +305,30 @@ MarkLocalBufferDirty(Buffer buffer)
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
}
+/*
+ * IsLocalBufferDirty
+ * returns true of buffer is dirty
+ */
+bool
+IsLocalBufferDirty(Buffer buffer)
+{
+ int bufid;
+ BufferDesc *bufHdr;
+ uint32 buf_state;
+
+ Assert(BufferIsLocal(buffer));
+
+ bufid = -(buffer + 1);
+
+ Assert(LocalRefCount[bufid] > 0);
+
+ bufHdr = GetLocalBufferDescriptor(bufid);
+
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+ return (buf_state & BM_DIRTY) != 0;
+}
+
/*
* DropRelFileNodeLocalBuffers
* This function removes from the buffer pool all the pages of the
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a581c0..fba74752a9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
@@ -140,6 +141,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
+ size = add_size(size, HeapCleanerShmemSize());
size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
@@ -256,6 +258,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
ProcSignalShmemInit();
CheckpointerShmemInit();
AutoVacuumShmemInit();
+ HeapCleanerShmemInit();
ReplicationSlotsShmemInit();
ReplicationOriginShmemInit();
WalSndShmemInit();
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ecedb..73a9d141b0 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
BackendRandomLock 43
LogicalRepWorkerLock 44
CLogTruncationLock 45
+HeapCleanerLock 46
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f30e082b2..ecf14647b4 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -41,6 +41,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "storage/condition_variable.h"
@@ -352,7 +353,7 @@ InitProcess(void)
* cleaning up. (XXX autovac launcher currently doesn't participate in
* this; it probably should.)
*/
- if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
+ if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && !IsHeapCleanerLauncherProcess())
MarkPostmasterChildActive();
/*
@@ -896,7 +897,7 @@ ProcKill(int code, Datum arg)
* way, so tell the postmaster we've cleaned up acceptably well. (XXX
* autovac launcher should be included here someday)
*/
- if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess())
+ if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && !IsHeapCleanerLauncherProcess())
MarkPostmasterChildInactive();
/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f4133953be..87779bd178 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -56,6 +56,7 @@
#include "parser/parser.h"
#include "pg_getopt.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/postmaster.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
@@ -2889,6 +2890,10 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (IsHeapCleanerWorkerProcess())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating bgheap process due to administrator command")));
else if (IsLogicalWorker())
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
@@ -3017,6 +3022,13 @@ ProcessInterrupts(void)
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling autovacuum task")));
}
+ if (IsHeapCleanerWorkerProcess())
+ {
+ LockErrorCleanup();
+ ereport(ERROR,
+ (errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("canceling bgheap worker task")));
+ }
if (RecoveryConflictPending)
{
RecoveryConflictPending = false;
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index e95e347184..8831d98460 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -468,6 +468,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
/* Translate command name into command type code. */
if (pg_strcasecmp(cmd, "VACUUM") == 0)
cmdtype = PROGRESS_COMMAND_VACUUM;
+ else if (pg_strcasecmp(cmd, "CLEANER") == 0)
+ cmdtype = PROGRESS_COMMAND_CLEANER;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/backend/utils/hash/Makefile b/src/backend/utils/hash/Makefile
index 64eebd1d99..9a3392fe37 100644
--- a/src/backend/utils/hash/Makefile
+++ b/src/backend/utils/hash/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/utils/hash
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = dynahash.o hashfn.o pg_crc.o
+OBJS = dynahash.o hashfn.o pg_crc.o shash.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/hash/shash.c b/src/backend/utils/hash/shash.c
new file mode 100644
index 0000000000..9710485c85
--- /dev/null
+++ b/src/backend/utils/hash/shash.c
@@ -0,0 +1,265 @@
+/*
+ * shash.c
+ *
+ * Lightweight hash tables without dynamic expansion
+ *
+ *
+ * Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/utils/hash/shash.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "limits.h"
+#include "utils/shash.h"
+
+#define ELEM(index) (&shtab->Elements[index*shtab->Header.ElementSize])
+
+static void check(SHTAB* shtab);
+
+/*
+ * Check data allocation of hash table variables
+ */
+static void
+check(SHTAB* shtab)
+{
+ Assert(shtab != NULL);
+ Assert(shtab->Elements != NULL);
+ Assert(shtab->state != NULL);
+}
+
+/*
+ * Create and initialize hash table with shctl options
+ */
+SHTAB*
+SHASH_Create(SHTABCTL shctl)
+{
+ SHTAB* shtab = (SHTAB *) palloc(sizeof(SHTAB));
+
+ Assert(shctl.ElementSize != 0);
+ Assert(shctl.ElementSize >= shctl.KeySize);
+
+ shtab->Header = shctl;
+ shtab->HTableSize = shtab->Header.ElementsMaxNum * (2. - shtab->Header.FillFactor) + 1;
+ Assert(shtab->HTableSize > shtab->Header.ElementsMaxNum);
+
+ /* Add one element as sign of empty value */
+ shtab->Elements = (char *) palloc(shtab->HTableSize * shctl.ElementSize);
+ shtab->state = (HESTATE *) palloc0(shtab->HTableSize * sizeof(HESTATE));
+ shtab->nElements = 0;
+ shtab->SeqScanCurElem = 0;
+
+ return shtab;
+}
+
+/*
+ * Fast cleaning of hash table.
+ * After call of this function hash table contains 0 elements
+ * and can be used again.
+ */
+void
+SHASH_Clean(SHTAB* shtab)
+{
+ int i;
+
+ check(shtab);
+
+ /*
+ * We do not free memory: only clean 'used' field and set number of elems at 0
+ */
+ for (i = 0; i < shtab->HTableSize; i++)
+ shtab->state[i] = SHASH_NUSED;
+ shtab->nElements = 0;
+ shtab->SeqScanCurElem = 0;
+}
+
+/*
+ * Free all hash table memory resources.
+ */
+void
+SHASH_Destroy(SHTAB* shtab)
+{
+ check(shtab);
+
+ pfree(shtab->Elements);
+ pfree(shtab->state);
+ pfree(shtab);
+}
+
+/*
+ * Get number of elements in the hash table.
+ */
+uint64
+SHASH_Entries(SHTAB* shtab)
+{
+ check(shtab);
+
+ Assert((shtab->nElements >= 0) && (shtab->nElements <= shtab->Header.ElementsMaxNum));
+ return shtab->nElements;
+}
+
+/*
+ * Find, enter or delete element.
+ * return NULL, if element not found.
+ */
+void*
+SHASH_Search(SHTAB* shtab, void *keyPtr, SHASHACTION action, bool *foundPtr)
+{
+ uint64 index,
+ first,
+ first_removed_index = ULONG_MAX;
+ void *result = NULL;
+
+ check(shtab);
+
+ Assert(shtab->nElements <= shtab->Header.ElementsMaxNum);
+
+ first = index = shtab->Header.HashFunc(keyPtr, shtab->Header.KeySize, shtab->HTableSize);
+ Assert(index < shtab->HTableSize);
+
+ if (foundPtr != NULL)
+ *foundPtr = false;
+
+ /*
+ * Main hash table search cycle
+ */
+ for(;;)
+ {
+ if (shtab->state[index] == SHASH_NUSED)
+ {
+ /* Empty position found */
+ switch (action)
+ {
+ case SHASH_ENTER:
+ if (shtab->nElements < shtab->Header.ElementsMaxNum)
+ {
+ if (first_removed_index != ULONG_MAX)
+ index= first_removed_index;
+
+ memcpy(ELEM(index), keyPtr, shtab->Header.KeySize);
+ shtab->state[index] = SHASH_USED;
+ shtab->nElements++;
+ result = ELEM(index);
+ }
+ break;
+ case SHASH_FIND:
+ /* Hash table not contains record with such key */
+ case SHASH_REMOVE:
+ /* may be potential problem */
+ break;
+ default:
+ Assert(0);
+ }
+
+ return result;
+ }
+
+ if ((shtab->state[index] == SHASH_USED) && (shtab->Header.CompFunc(keyPtr, ELEM(index))))
+ {
+ /*
+ * We found the element exactly
+ */
+ if (foundPtr != NULL)
+ *foundPtr = true;
+
+ if (action == SHASH_REMOVE)
+ {
+ Assert(shtab->nElements > 0);
+ /* Data will be cleaned by ENTER operation */
+ shtab->state[index] = SHASH_REMOVED;
+ shtab->nElements--;
+ }
+
+ return ELEM(index);
+ }
+
+ if ((shtab->state[index] == SHASH_REMOVED) && (action == SHASH_ENTER))
+ /* We can use this element for SHASH_ENTER, potentially */
+ if (first_removed_index == ULONG_MAX)
+ first_removed_index = index;
+
+ /* Go to next element */
+ index = (index+1)%(shtab->HTableSize);
+
+ if (index == first)
+ {
+ /*
+ * We made all-table search cycle.
+ */
+ if ((action == SHASH_ENTER) && (first_removed_index != ULONG_MAX) && (shtab->nElements < shtab->Header.ElementsMaxNum))
+ {
+ index = first_removed_index;
+ memcpy(ELEM(index), keyPtr, shtab->Header.KeySize);
+ shtab->state[index] = SHASH_USED;
+ shtab->nElements++;
+ result = ELEM(index);
+ }
+ else
+ result = NULL;
+ return result;
+ }
+ }
+
+ Assert(0);
+ return NULL;
+}
+
+/*
+ * Begin new sequental scan of the hash table.
+ */
+void
+SHASH_SeqReset(SHTAB* shtab)
+{
+ check(shtab);
+
+ shtab->SeqScanCurElem = 0;
+}
+
+/*
+ * Get next hash table value during sequental scan.
+ * Call after last element returned returns NULL.
+ */
+void*
+SHASH_SeqNext(SHTAB* shtab)
+{
+ check(shtab);
+
+ if (shtab->SeqScanCurElem == shtab->HTableSize)
+ return NULL;
+
+ Assert(shtab->SeqScanCurElem < shtab->HTableSize);
+
+ do
+ {
+ if (shtab->state[shtab->SeqScanCurElem] == SHASH_USED)
+ return ELEM(shtab->SeqScanCurElem++);
+ else
+ shtab->SeqScanCurElem++;
+ }
+ while (shtab->SeqScanCurElem < shtab->HTableSize);
+
+ return NULL;
+}
+
+/*
+ * Simple hash function.
+ */
+uint64
+DefaultHashValueFunc(void *key, uint64 size, uint64 base)
+{
+ uint64 sum = 0;
+ int i;
+
+ for (i = 0; i < size; i++)
+ {
+ uint64 x = ((char *)key)[i];
+ sum += x*x*x*x + x*x*x + x*x + x + 1;
+ }
+
+ return sum%base;
+}
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 865119d272..9ebdd1c7de 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -38,6 +38,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -689,7 +690,7 @@ InitializeSessionUserIdStandalone(void)
* This function should only be called in single-user mode, in autovacuum
* workers, and in background workers.
*/
- AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker);
+ AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker || IsHeapCleanerWorkerProcess());
/* call only once */
AssertState(!OidIsValid(AuthenticatedUserId));
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 5ef6315d20..4345b8bbbc 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -38,6 +38,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgheap.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@@ -321,7 +322,7 @@ CheckMyDatabase(const char *name, bool am_superuser, bool override_allow_connect
*
* We do not enforce them for autovacuum worker processes either.
*/
- if (IsUnderPostmaster && !IsAutoVacuumWorkerProcess())
+ if (IsUnderPostmaster && !IsAutoVacuumWorkerProcess() && !IsHeapCleanerWorkerProcess())
{
/*
* Check that the database is currently allowing connections.
@@ -504,8 +505,8 @@ InitializeMaxBackends(void)
Assert(MaxBackends == 0);
/* the extra unit accounts for the autovacuum launcher */
- MaxBackends = MaxConnections + autovacuum_max_workers + 1 +
- max_worker_processes;
+ MaxBackends = MaxConnections + autovacuum_max_workers +
+ heapcleaner_max_workers + 1 + max_worker_processes;
/* internal error because the values were all checked previously */
if (MaxBackends > MAX_BACKENDS)
@@ -682,7 +683,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
before_shmem_exit(ShutdownPostgres, 0);
/* The autovacuum launcher is done here */
- if (IsAutoVacuumLauncherProcess())
+ if (IsAutoVacuumLauncherProcess() || IsHeapCleanerLauncherProcess())
{
/* report this backend in the PgBackendStatus array */
pgstat_bestart();
@@ -722,7 +723,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
* In standalone mode and in autovacuum worker processes, we use a fixed
* ID, otherwise we figure it out from the authenticated user name.
*/
- if (bootstrap || IsAutoVacuumWorkerProcess())
+ if (bootstrap || IsAutoVacuumWorkerProcess() || IsHeapCleanerWorkerProcess())
{
InitializeSessionUserIdStandalone();
am_superuser = true;
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 6a6b467fee..698b007d65 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -34,4 +34,16 @@
#define PROGRESS_VACUUM_PHASE_TRUNCATE 5
#define PROGRESS_VACUUM_PHASE_FINAL_CLEANUP 6
+/* Progress parameters for cleaner */
+#define PROGRESS_CLEANER_RELATIONS 0
+#define PROGRESS_CLEANER_TOTAL_QUEUE_LENGTH 1
+#define PROGRESS_CLEANER_BUF_NINMEM 2
+#define PROGRESS_CLEANER_TOTAL_DELETIONS 3
+#define PROGRESS_CLEANER_VAINLY_CLEANED_TUPLES 4
+#define PROGRESS_CLEANER_MISSED_BLOCKS 5
+#define PROGRESS_CLEANER_CLEANED_BLOCKS 6
+#define PROGRESS_CLEANER_VAINLY_CLEANED_BLOCKS 7
+#define PROGRESS_CLEANER_NACQUIRED_LOCKS 8
+#define PROGRESS_CLEANER_TIMEOUT 9
+
#endif
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 85d472f0a5..705120eddc 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -186,6 +186,9 @@ extern void vacuum_set_xid_limits(Relation rel,
extern void vac_update_datfrozenxid(void);
extern void vacuum_delay_point(void);
+extern void quick_vacuum_index(Relation irel, Relation hrel,
+ ItemPointer dead_tuples,
+ int num_dead_tuples);
/* in commands/vacuumlazy.c */
extern void lazy_vacuum_rel(Relation onerel, int options,
VacuumParams *params, BufferAccessStrategy bstrategy);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index d59c24ae23..dbf09328ac 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -56,6 +56,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RESETSHAREDCOUNTER,
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_AUTOVAC_START,
+ PGSTAT_MTYPE_BGHEAP_START,
PGSTAT_MTYPE_VACUUM,
PGSTAT_MTYPE_ANALYZE,
PGSTAT_MTYPE_ARCHIVER,
@@ -354,6 +355,12 @@ typedef struct PgStat_MsgAutovacStart
TimestampTz m_start_time;
} PgStat_MsgAutovacStart;
+typedef struct PgStat_MsgHeapCleanerStart
+{
+ PgStat_MsgHdr m_hdr;
+ Oid m_databaseid;
+ TimestampTz m_start_time;
+} PgStat_MsgHeapCleanerStart;
/* ----------
* PgStat_MsgVacuum Sent by the backend or autovacuum daemon
@@ -704,6 +711,8 @@ typedef enum BackendType
B_AUTOVAC_LAUNCHER,
B_AUTOVAC_WORKER,
B_BACKEND,
+ B_BG_HEAPCLNR_LAUNCHER,
+ B_BG_HEAPCLNR_WORKER,
B_BG_WORKER,
B_BG_WRITER,
B_CHECKPOINTER,
@@ -756,6 +765,7 @@ typedef enum
{
WAIT_EVENT_ARCHIVER_MAIN = PG_WAIT_ACTIVITY,
WAIT_EVENT_AUTOVACUUM_MAIN,
+ WAIT_EVENT_BGHEAP_MAIN,
WAIT_EVENT_BGWRITER_HIBERNATE,
WAIT_EVENT_BGWRITER_MAIN,
WAIT_EVENT_CHECKPOINTER_MAIN,
@@ -933,7 +943,8 @@ typedef enum
typedef enum ProgressCommandType
{
PROGRESS_COMMAND_INVALID,
- PROGRESS_COMMAND_VACUUM
+ PROGRESS_COMMAND_VACUUM,
+ PROGRESS_COMMAND_CLEANER
} ProgressCommandType;
#define PGSTAT_NUM_PROGRESS_PARAM 10
@@ -1181,6 +1192,7 @@ extern void pgstat_reset_shared_counters(const char *);
extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
extern void pgstat_report_autovac(Oid dboid);
+extern void pgstat_report_heapcleaner(Oid dboid);
extern void pgstat_report_vacuum(Oid tableoid, bool shared,
PgStat_Counter livetuples, PgStat_Counter deadtuples);
extern void pgstat_report_analyze(Relation rel,
diff --git a/src/include/postmaster/bgheap.h b/src/include/postmaster/bgheap.h
new file mode 100644
index 0000000000..e11b310ffd
--- /dev/null
+++ b/src/include/postmaster/bgheap.h
@@ -0,0 +1,36 @@
+/*
+ * bgheap.h
+ *
+ * Created on: 27.06.2018
+ * Author: andrey
+ */
+
+#ifndef SRC_INCLUDE_ACCESS_BGHEAP_H_
+#define SRC_INCLUDE_ACCESS_BGHEAP_H_
+
+#include "storage/block.h"
+
+#ifdef EXEC_BACKEND
+extern void HeapCleanerLauncherIAm(void);
+extern void HeapCleanerWorkerIAm(void);
+extern void HeapCleanerLauncherMain(int argc, char *argv[]) pg_attribute_noreturn();
+extern void HeapCleanerWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+#endif
+
+extern int heapcleaner_max_workers;
+
+
+extern void AtEOXact_BGHeap_tables(bool isCommit);
+extern bool IsHeapCleanerLauncherProcess(void);
+extern bool IsHeapCleanerWorkerProcess(void);
+
+extern void HeapCleanerInit(void);
+extern int StartHeapCleanerLauncher(void);
+extern int StartHeapCleanerWorker(void);
+
+extern void HeapCleanerSend(Relation relation, BlockNumber blkno);
+
+extern void HeapCleanerShmemInit(void);
+extern Size HeapCleanerShmemSize(void);
+
+#endif /* SRC_INCLUDE_ACCESS_BGHEAP_H_ */
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 5370035f0c..58be48198b 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -333,6 +333,7 @@ extern void LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum,
extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
BlockNumber blockNum, bool *foundPtr);
extern void MarkLocalBufferDirty(Buffer buffer);
+extern bool IsLocalBufferDirty(Buffer buffer);
extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
BlockNumber firstDelBlock);
extern void DropRelFileNodeAllLocalBuffers(RelFileNode rnode);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 3cce3906a0..07536fc7e7 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -43,8 +43,10 @@ typedef enum
RBM_ZERO_AND_CLEANUP_LOCK, /* Like RBM_ZERO_AND_LOCK, but locks the page
* in "cleanup" mode */
RBM_ZERO_ON_ERROR, /* Read, but return an all-zeros page on error */
- RBM_NORMAL_NO_LOG /* Don't log page as invalid during WAL
+ RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL
* replay; otherwise same as RBM_NORMAL */
+ RBM_NORMAL_NO_READ /* Don't actually read page, returns only if it
+ * presents in shared memory */
} ReadBufferMode;
/* forward declared, to avoid having to expose buf_internals.h here */
@@ -175,6 +177,7 @@ extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern void MarkBufferDirty(Buffer buffer);
+extern bool IsBufferDirty(Buffer buffer);
extern void IncrBufferRefCount(Buffer buffer);
extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
BlockNumber blockNum);
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 5ecc1b757c..4c73acd62c 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -36,6 +36,8 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_START_HEAPCLNR_LAUNCHER,
+ PMSIGNAL_START_HEAPCLNR_WORKER,
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/include/utils/shash.h b/src/include/utils/shash.h
new file mode 100644
index 0000000000..7a209cf5dc
--- /dev/null
+++ b/src/include/utils/shash.h
@@ -0,0 +1,54 @@
+#ifndef SHASH_H_
+#define SHASH_H_
+
+/* hash_search operations */
+typedef enum
+{
+ SHASH_FIND,
+ SHASH_ENTER,
+ SHASH_REMOVE
+} SHASHACTION;
+
+/* States of a hash table element */
+typedef enum
+{
+ SHASH_NUSED = 0,
+ SHASH_USED,
+ SHASH_REMOVED
+} HESTATE;
+
+typedef uint64 (*SHashValueFunc) (void *key, uint64 size, uint64 base);
+typedef bool (*CompareFunc) (void* bucket1, void* bucket2);
+
+typedef struct SHTABCTL
+{
+ uint64 ElementSize;
+ uint64 KeySize;
+ uint64 ElementsMaxNum;
+ float FillFactor;
+ SHashValueFunc HashFunc;
+ CompareFunc CompFunc;
+} SHTABCTL;
+
+typedef struct SHTAB
+{
+ SHTABCTL Header;
+ char *Elements;
+ uint64 nElements;
+ HESTATE *state; /* State of an element of hash table*/
+ uint64 SeqScanCurElem;
+ uint64 HTableSize;
+} SHTAB;
+
+typedef struct SHTAB* PSHTAB;
+
+extern PSHTAB SHASH_Create(SHTABCTL shctl);
+extern void SHASH_Clean(PSHTAB shtab);
+extern void SHASH_Destroy(PSHTAB shtab);
+extern uint64 SHASH_Entries(PSHTAB shtab);
+extern void SHASH_SeqReset(PSHTAB shtab);
+extern void* SHASH_SeqNext(PSHTAB shtab);
+extern void* SHASH_Search(PSHTAB shtab, void *keyPtr, SHASHACTION action, bool *foundPtr);
+extern uint64 DefaultHashValueFunc(void *key, uint64 size, uint64 base);
+
+#endif /* SHASH_H_ */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 744d501e31..478e8e698c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -167,7 +167,7 @@ select * from rtest_v1;
insert into rtest_v1 values (2, 12);
insert into rtest_v1 values (2, 13);
-select * from rtest_v1;
+select * from rtest_v1 ORDER BY a, b;
a | b
---+----
1 | 11
@@ -1821,6 +1821,22 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
FROM pg_database d;
+pg_stat_progress_cleaner| SELECT s.pid,
+ s.datid,
+ d.datname,
+ s.relid,
+ s.param1 AS relations,
+ s.param2 AS queue_len,
+ s.param3 AS ninmem,
+ s.param4 AS cleaned_tuples,
+ s.param5 AS vain_tuples,
+ s.param6 AS missed_blocks,
+ s.param7 AS cleaned_blocks,
+ s.param8 AS vain_blocks,
+ s.param9 AS nlocks,
+ s.param10 AS timeout
+ FROM (pg_stat_get_progress_info('CLEANER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10)
+ LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_vacuum| SELECT s.pid,
s.datid,
d.datname,
diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out
index e57c6e1c42..b9c0af7bce 100644
--- a/src/test/regress/expected/triggers.out
+++ b/src/test/regress/expected/triggers.out
@@ -637,7 +637,7 @@ select * from trigtest2;
-- ensure we still insert, even when all triggers are disabled
insert into trigtest default values;
-select * from trigtest;
+select * from trigtest ORDER BY i;
i
---
3
diff --git a/src/test/regress/input/constraints.source b/src/test/regress/input/constraints.source
index 98dd4210e9..2968a1230b 100644
--- a/src/test/regress/input/constraints.source
+++ b/src/test/regress/input/constraints.source
@@ -337,7 +337,7 @@ ROLLBACK;
-- check is done at end of statement, so this should succeed
UPDATE unique_tbl SET i = i+1;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
-- explicitly defer the constraint
BEGIN;
@@ -349,7 +349,7 @@ DELETE FROM unique_tbl WHERE t = 'tree'; -- makes constraint valid again
COMMIT; -- should succeed
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
-- try adding an initially deferred constraint
ALTER TABLE unique_tbl DROP CONSTRAINT unique_tbl_i_key;
@@ -367,7 +367,7 @@ DELETE FROM unique_tbl WHERE i = 5 AND t = 'five';
COMMIT;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
-- should fail at commit-time
BEGIN;
@@ -420,7 +420,7 @@ UPDATE unique_tbl SET t = 'THREE' WHERE i = 3 AND t = 'Three';
COMMIT; -- should fail
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
-- test a HOT update that modifies the newly inserted tuple,
-- but should succeed because we then remove the other conflicting tuple.
@@ -431,11 +431,11 @@ INSERT INTO unique_tbl VALUES(3, 'tree'); -- should succeed for now
UPDATE unique_tbl SET t = 'threex' WHERE t = 'tree';
DELETE FROM unique_tbl WHERE t = 'three';
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
COMMIT;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
DROP TABLE unique_tbl;
diff --git a/src/test/regress/output/constraints.source b/src/test/regress/output/constraints.source
index a6a1df18e7..a2ae6df587 100644
--- a/src/test/regress/output/constraints.source
+++ b/src/test/regress/output/constraints.source
@@ -478,7 +478,7 @@ DETAIL: Key (i)=(1) already exists.
ROLLBACK;
-- check is done at end of statement, so this should succeed
UPDATE unique_tbl SET i = i+1;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+------
1 | one
@@ -494,14 +494,14 @@ SET CONSTRAINTS unique_tbl_i_key DEFERRED;
INSERT INTO unique_tbl VALUES (3, 'three');
DELETE FROM unique_tbl WHERE t = 'tree'; -- makes constraint valid again
COMMIT; -- should succeed
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+-------
1 | one
2 | two
+ 3 | three
4 | four
5 | five
- 3 | three
(5 rows)
-- try adding an initially deferred constraint
@@ -516,14 +516,14 @@ UPDATE unique_tbl SET i = 2 WHERE i = 4 AND t = 'four';
DELETE FROM unique_tbl WHERE i = 1 AND t = 'one';
DELETE FROM unique_tbl WHERE i = 5 AND t = 'five';
COMMIT;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+-------
- 3 | three
1 | five
- 5 | one
- 4 | two
2 | four
+ 3 | three
+ 4 | two
+ 5 | one
(5 rows)
-- should fail at commit-time
@@ -581,14 +581,14 @@ UPDATE unique_tbl SET t = 'THREE' WHERE i = 3 AND t = 'Three';
COMMIT; -- should fail
ERROR: duplicate key value violates unique constraint "unique_tbl_i_key"
DETAIL: Key (i)=(3) already exists.
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+-------
- 3 | three
1 | five
- 5 | one
- 4 | two
2 | four
+ 3 | three
+ 4 | two
+ 5 | one
(5 rows)
-- test a HOT update that modifies the newly inserted tuple,
@@ -597,25 +597,25 @@ BEGIN;
INSERT INTO unique_tbl VALUES(3, 'tree'); -- should succeed for now
UPDATE unique_tbl SET t = 'threex' WHERE t = 'tree';
DELETE FROM unique_tbl WHERE t = 'three';
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+--------
1 | five
- 5 | one
- 4 | two
2 | four
3 | threex
+ 4 | two
+ 5 | one
(5 rows)
COMMIT;
-SELECT * FROM unique_tbl;
+SELECT * FROM unique_tbl ORDER BY i;
i | t
---+--------
1 | five
- 5 | one
- 4 | two
2 | four
3 | threex
+ 4 | two
+ 5 | one
(5 rows)
DROP TABLE unique_tbl;
diff --git a/src/test/regress/sql/rules.sql b/src/test/regress/sql/rules.sql
index 3ca4c07356..c540f12a29 100644
--- a/src/test/regress/sql/rules.sql
+++ b/src/test/regress/sql/rules.sql
@@ -181,7 +181,7 @@ delete from rtest_v1 where b = 12;
select * from rtest_v1;
insert into rtest_v1 values (2, 12);
insert into rtest_v1 values (2, 13);
-select * from rtest_v1;
+select * from rtest_v1 ORDER BY a, b;
** Remember the delete rule on rtest_v1: It says
** DO INSTEAD DELETE FROM rtest_t1 WHERE a = old.a
** So this time both rows with a = 2 must get deleted
diff --git a/src/test/regress/sql/triggers.sql b/src/test/regress/sql/triggers.sql
index 1250cd63e0..5a45fbad41 100644
--- a/src/test/regress/sql/triggers.sql
+++ b/src/test/regress/sql/triggers.sql
@@ -419,7 +419,7 @@ delete from trigtest where i=1;
select * from trigtest2;
-- ensure we still insert, even when all triggers are disabled
insert into trigtest default values;
-select * from trigtest;
+select * from trigtest ORDER BY i;
drop table trigtest2;
drop table trigtest;
--
2.17.1