Hi All,

As per discussion here
http://archives.postgresql.org/pgsql-hackers/2011-05/msg01119.php

PFA a patch which implements the idea with some variation.

At the start of the first pass, we remember the current LSN. Every page that
needs some work is HOT-pruned so that dead tuples are truncated to dead line
pointers. We collect those dead line pointers and mark them as
dead-vacuumed. Since we don't have any LP flag bits available, we instead
just use the LP_DEAD flag along with offset value 1 to mark the line pointer
as dead-vacuumed. The page is defragmented and we  store the LSN remembered
at the start of the pass in the page special area as vacuum LSN. We also
update the free space at that point because we are not going to do a second
pass on the page anymore.

Once we collect all dead line pointers and mark them as dead-vacuumed, we
clean-up the indexes and remove all index pointers pointing to those
dead-vacuumed line pointers. If the index vacuum finishes successfully, we
store the LSN in the pg_class row of the table (needs catalog changes). At
that point, we are certain that there are no index pointers pointing to
dead-vacuumed line pointers and they can be reclaimed at the next
opportunity.

During normal operations or subsequent vacuum, if the page is chosen for
HOT-prunung, we check if has any dead-vacuumed line pointers and if the
vacuum LSN stored on the page special area is equal to the one stored in the
pg_class row, and reclaim those dead-vacuum line pointers (the index
pointers to these line pointers are already taken care of). If the pg_class
LSN is not the same, the last vacuum probably did not finish completely and
we collect the dead-vacuum line pointers just like other dead line pointers
and try to clean up the index pointers as usual.

I ran few pgbench tests with the patch. I don't see much difference in the
overall tps, but the vacuum time for the accounts table reduces by nearly
50%. I neither see much difference in the overall bloat, but then pgbench
uses HOT very nicely and the accounts table got only couple of vacuum cycles
in my 7-8 hour run.

There are couple of things that probably need more attention. I am not sure
if we need to teach ANALYZE to treat dead line pointers differently. Since
they take up much less space than a dead tuple, they should definitely have
a lower weight, but at the same time, we need to take into account the
number of indexes on the table. The start of first pass LSN that we are
remembering is in fact the start of the WAL page and I think there could be
some issues with that, especially for very tiny tables. For example, first
vacuum may run completely. If another vacuum is started on the same table
and say it gets the same LSN (because we did not write more than 1 page
worth WAL in between) and if the second vacuum aborts after it cleaned up
few pages, we might get into some trouble. The likelihood of such things
happening is very small, but may be its worth taking care of it. May be we
can get the exact current LSN and not store it in the pg_class if we don't
do anything during the cycle.

Comments ?

Thanks,
Pavan
-- 
Pavan Deolasee
EnterpriseDB     http://www.enterprisedb.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 01a492e..12918d2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3912,7 +3912,8 @@ log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid)
+			   TransactionId latestRemovedXid,
+			   bool hasvaclsn, XLogRecPtr indexvaclsn)
 {
 	xl_heap_clean xlrec;
 	uint8		info;
@@ -3927,6 +3928,8 @@ log_heap_clean(Relation reln, Buffer buffer,
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
+	xlrec.hasvaclsn = hasvaclsn;
+	xlrec.indexvaclsn = indexvaclsn;
 
 	rdata[0].data = (char *) &xlrec;
 	rdata[0].len = SizeOfHeapClean;
@@ -4196,6 +4199,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	int			ndead;
 	int			nunused;
 	Size		freespace;
+	bool		hasvaclsn;
+	XLogRecPtr	indexvaclsn;	
 
 	/*
 	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
@@ -4228,6 +4233,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 
 	nredirected = xlrec->nredirected;
 	ndead = xlrec->ndead;
+	hasvaclsn = xlrec->hasvaclsn;
+	indexvaclsn = xlrec->indexvaclsn;
 	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
 	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
 	nowdead = redirected + (nredirected * 2);
@@ -4239,7 +4246,8 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	heap_page_prune_execute(buffer,
 							redirected, nredirected,
 							nowdead, ndead,
-							nowunused, nunused);
+							nowunused, nunused,
+							hasvaclsn, indexvaclsn);
 
 	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 0cfa866..e402f51 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -31,9 +31,13 @@ typedef struct
 	TransactionId new_prune_xid;	/* new prune hint value for page */
 	TransactionId latestRemovedXid;		/* latest xid to be removed by this
 										 * prune */
+	bool		need_vaclsn;		/* track if the page needs a vacuum lsn */
+	int			already_dead;		/* number of already dead line pointers */
+
 	int			nredirected;	/* numbers of entries in arrays below */
 	int			ndead;
 	int			nunused;
+
 	/* arrays that accumulate indexes of items to be changed */
 	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
 	OffsetNumber nowdead[MaxHeapTuplesPerPage];
@@ -74,6 +78,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
 {
 	Page		page = BufferGetPage(buffer);
 	Size		minfree;
+	XLogRecPtr  invalid_lsn = {0, 0};
 
 	/*
 	 * Let's see if we really need pruning.
@@ -126,7 +131,7 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
 																 * needed */
 
 			/* OK to prune */
-			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
+			(void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore, invalid_lsn);
 		}
 
 		/* And release buffer lock */
@@ -153,7 +158,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
  */
 int
 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid)
+				bool report_stats, TransactionId *latestRemovedXid,
+				XLogRecPtr indexvaclsn)
 {
 	int			ndeleted = 0;
 	Page		page = BufferGetPage(buffer);
@@ -175,8 +181,37 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	prstate.new_prune_xid = InvalidTransactionId;
 	prstate.latestRemovedXid = InvalidTransactionId;
 	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+	prstate.need_vaclsn = false;
+	prstate.already_dead = 0;
 	memset(prstate.marked, 0, sizeof(prstate.marked));
 
+	/*
+	 * Check if the page has any dead-vacuumed line pointers. If the vacuum
+	 * that created these dead-vacuumed line pointers has successfully
+	 * completed, we can now remove those line pointers.
+	 */
+	if (PageHasVacuumLSN(page))
+	{
+		XLogRecPtr pagevaclsn = PageGetVacuumLSN(page);
+		XLogRecPtr last_indexvaclsn;
+		
+		prstate.need_vaclsn = true;
+
+		last_indexvaclsn.xlogid = relation->rd_rel->relindxvacxlogid;
+		last_indexvaclsn.xrecoff = relation->rd_rel->relindxvacxlogoff;
+
+		if (!XLogRecPtrIsInvalid(last_indexvaclsn) &&
+			!XLogRecPtrIsInvalid(pagevaclsn) &&
+			XLByteEQ(pagevaclsn, last_indexvaclsn))
+		{
+			/* 
+			 * Existing dead-vacuumed line pointers can be removed and the
+			 * page no longer needs a vacuum lsn
+			 */
+			prstate.need_vaclsn = false;
+		}
+	}
+
 	/* Scan the page */
 	maxoff = PageGetMaxOffsetNumber(page);
 	for (offnum = FirstOffsetNumber;
@@ -191,8 +226,26 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 
 		/* Nothing to do if slot is empty or already dead */
 		itemid = PageGetItemId(page, offnum);
-		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
+		if (!ItemIdIsUsed(itemid))
 			continue;
+		
+		/* 
+		 * If the slot is dead-vacuumed and we know that the index pointers
+		 * have already been vacuumed by the last index vacuum, just mark them
+		 * unused so that they are removed when we defrag the page
+		 */
+		if (ItemIdIsDeadVacuumed(itemid))
+		{
+			if (!prstate.need_vaclsn)
+				heap_prune_record_unused(&prstate, offnum);
+			continue;
+		}
+		else if (ItemIdIsDead(itemid))
+		{
+			heap_prune_record_dead(&prstate, offnum);
+			prstate.already_dead++;
+			continue;
+		}
 
 		/* Process this item or chain of items */
 		ndeleted += heap_prune_chain(relation, buffer, offnum,
@@ -213,7 +266,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		heap_page_prune_execute(buffer,
 								prstate.redirected, prstate.nredirected,
 								prstate.nowdead, prstate.ndead,
-								prstate.nowunused, prstate.nunused);
+								prstate.nowunused, prstate.nunused,
+								prstate.need_vaclsn, indexvaclsn);
 
 		/*
 		 * Update the page's pd_prune_xid field to either zero, or the lowest
@@ -241,7 +295,8 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 									prstate.redirected, prstate.nredirected,
 									prstate.nowdead, prstate.ndead,
 									prstate.nowunused, prstate.nunused,
-									prstate.latestRemovedXid);
+									prstate.latestRemovedXid, prstate.need_vaclsn,
+									indexvaclsn);
 
 			PageSetLSN(BufferGetPage(buffer), recptr);
 			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
@@ -273,9 +328,12 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	 * If requested, report the number of tuples reclaimed to pgstats. This is
 	 * ndeleted minus ndead, because we don't want to count a now-DEAD root
 	 * item as a deletion for this purpose.
+	 *
+	 * Adjust already_dead since they are counted as ndead and we really don't
+	 * want to include them here
 	 */
-	if (report_stats && ndeleted > prstate.ndead)
-		pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
+	if (report_stats && ndeleted > (prstate.ndead - prstate.already_dead)) 
+		pgstat_update_heap_dead_tuples(relation, ndeleted - (prstate.ndead - prstate.already_dead));
 
 	*latestRemovedXid = prstate.latestRemovedXid;
 
@@ -645,7 +703,8 @@ void
 heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused)
+						OffsetNumber *nowunused, int nunused,
+						bool need_vaclsn, XLogRecPtr indexvaclsn)
 {
 	Page		page = (Page) BufferGetPage(buffer);
 	OffsetNumber *offnum;
@@ -669,7 +728,14 @@ heap_page_prune_execute(Buffer buffer,
 		OffsetNumber off = *offnum++;
 		ItemId		lp = PageGetItemId(page, off);
 
-		ItemIdSetDead(lp);
+		/*
+		 * If indexvaclsn is not invalid, we are being called from a vacuum and
+		 * we can mark the dead line pointers as dead-vacuumed
+		 */
+		if (XLogRecPtrIsInvalid(indexvaclsn))
+			ItemIdSetDead(lp);
+		else
+			ItemIdSetDeadVacuumed(lp);
 	}
 
 	/* Update all now-unused line pointers */
@@ -684,9 +750,14 @@ heap_page_prune_execute(Buffer buffer,
 
 	/*
 	 * Finally, repair any fragmentation, and update the page's hint bit about
-	 * whether it has free pointers.
+	 * whether it has free pointers. Also, make room for storing vacuum lsn if
+	 * required
 	 */
-	PageRepairFragmentation(page);
+	PageRepairFragmentation(page,
+			(!XLogRecPtrIsInvalid(indexvaclsn) && ndead > 0) || need_vaclsn);
+
+	if (PageHasVacuumLSN(page) && !XLogRecPtrIsInvalid(indexvaclsn))
+		PageSetVacuumLSN(page, indexvaclsn);
 }
 
 
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 71c9931..e60583d 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -770,6 +770,8 @@ InsertPgClassTuple(Relation pg_class_desc,
 	values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
 	values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
 	values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
+	values[Anum_pg_class_relindxvacxlogid - 1] = Int32GetDatum(rd_rel->relindxvacxlogid);
+	values[Anum_pg_class_relindxvacxlogoff - 1] = Int32GetDatum(rd_rel->relindxvacxlogoff);
 	if (relacl != (Datum) 0)
 		values[Anum_pg_class_relacl - 1] = relacl;
 	else
@@ -864,6 +866,9 @@ AddNewRelationTuple(Relation pg_class_desc,
 		new_rel_reltup->relfrozenxid = InvalidTransactionId;
 	}
 
+	new_rel_reltup->relindxvacxlogid = 0;
+	new_rel_reltup->relindxvacxlogoff = 0;
+
 	new_rel_reltup->relowner = relowner;
 	new_rel_reltup->reltype = new_type_oid;
 	new_rel_reltup->reloftype = reloftype;
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index fa84989..4aa49f6 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1056,6 +1056,7 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 		Page		targpage;
 		OffsetNumber targoffset,
 					maxoffset;
+		bool		vacuum_dead = false;
 
 		vacuum_delay_point();
 
@@ -1074,6 +1075,32 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 		targpage = BufferGetPage(targbuffer);
 		maxoffset = PageGetMaxOffsetNumber(targpage);
 
+		/*
+		 * Single-pass vacuum can leave a large number of dead line pointers in
+		 * the heap and those would be cleaned up either during the next vacuum
+		 * cycle or HOT-cleanup. If the page vacuum LSN is equal to the index
+		 * vacuum LSN, the next HOT cleanup can remove the dead line pointers
+		 * and we don't need a vacuum to remove those dead line pointers. OTOH
+		 * if the dead line pointers were generated after we remoted the index
+		 * pointers last time, we need a vacuum cycle to remove them.
+		 */
+        if (PageHasVacuumLSN(targpage))
+        {
+            XLogRecPtr pagevaclsn = PageGetVacuumLSN(targpage);
+            XLogRecPtr indexvaclsn;
+
+            indexvaclsn.xlogid = onerel->rd_rel->relindxvacxlogid;
+            indexvaclsn.xrecoff = onerel->rd_rel->relindxvacxlogoff;
+
+            if (!XLogRecPtrIsInvalid(indexvaclsn) &&
+                    !XLogRecPtrIsInvalid(pagevaclsn) &&
+                    XLByteEQ(pagevaclsn, indexvaclsn))
+            {
+                vacuum_dead = true;
+            }
+        }
+
+
 		/* Inner loop over all tuples on the selected page */
 		for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
 		{
@@ -1088,11 +1115,20 @@ acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
 			 * pointers should be counted as dead, because we need vacuum to
 			 * run to get rid of them.	Note that this rule agrees with the
 			 * way that heap_page_prune() counts things.
+			 *
+			 * XXX We don't count dead line pointers if know that they can be
+			 * removed by a HOT cleanup.
 			 */
 			if (!ItemIdIsNormal(itemid))
 			{
-				if (ItemIdIsDead(itemid))
-					deadrows += 1;
+				if (ItemIdIsDeadVacuumed(itemid))
+				{
+					if (!vacuum_dead)
+						deadrows += 1;
+				}
+				else if (ItemIdIsDead(itemid))
+					deadrows++;
+
 				continue;
 			}
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 224c34f..7b393fb 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -634,6 +634,36 @@ vac_update_relstats(Relation relation,
 	heap_close(rd, RowExclusiveLock);
 }
 
+/*
+ * Update the lsn of the last successful index vacuum. We can fold this into
+ * the vac_update_relstats routine, but we want this to run irresective of
+ * whether we scanned all the pages or not, but that may be a small issue to
+ * handle. Nevertheless, keep this as a separate routine for now.
+ */
+void
+vac_update_indexvaclsn(Relation relation, XLogRecPtr indexvaclsn)
+{
+	Oid			relid = RelationGetRelid(relation);
+	Relation	rd;
+	HeapTuple	ctup;
+	Form_pg_class pgcform;
+
+	rd = heap_open(RelationRelationId, RowExclusiveLock);
+
+	/* Fetch a copy of the tuple to scribble on */
+	ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(ctup))
+		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+			 relid);
+	pgcform = (Form_pg_class) GETSTRUCT(ctup);
+
+	pgcform->relindxvacxlogid = indexvaclsn.xlogid;
+	pgcform->relindxvacxlogoff = indexvaclsn.xrecoff;
+
+	heap_inplace_update(rd, ctup);
+
+	heap_close(rd, RowExclusiveLock);
+}
 
 /*
  *	vac_update_datfrozenxid() -- update pg_database.datfrozenxid for our DB
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index ce5fa18..5fd90df 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -99,6 +99,7 @@ typedef struct LVRelStats
 	ItemPointer dead_tuples;	/* array of ItemPointerData */
 	int			num_index_scans;
 	TransactionId latestRemovedXid;
+	XLogRecPtr	indexvaclsn;
 } LVRelStats;
 
 
@@ -114,15 +115,12 @@ static BufferAccessStrategy vac_strategy;
 /* non-export function prototypes */
 static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			   Relation *Irel, int nindexes, bool scan_all);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
 				  LVRelStats *vacrelstats);
 static void lazy_cleanup_index(Relation indrel,
 				   IndexBulkDeleteResult *stats,
 				   LVRelStats *vacrelstats);
-static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats);
 static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
 static BlockNumber count_nondeletable_pages(Relation onerel,
 						 LVRelStats *vacrelstats);
@@ -215,6 +213,10 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
 						InvalidTransactionId :
 						FreezeLimit);
 
+	/* Since vacuum ran to completion, remember the vacuum lsn */
+	if (!XLogRecPtrIsInvalid(vacrelstats->indexvaclsn))
+		vac_update_indexvaclsn(onerel, vacrelstats->indexvaclsn);
+
 	/* report results to the stats collector, too */
 	pgstat_report_vacuum(RelationGetRelid(onerel),
 						 onerel->rd_rel->relisshared,
@@ -307,6 +309,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber next_not_all_visible_block;
 	bool		skipping_all_visible_blocks;
+	XLogRecPtr	start_lsn_index_vacuum = {0, 0};
 
 	pg_rusage_init(&ru0);
 
@@ -319,6 +322,21 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 	empty_pages = vacuumed_pages = 0;
 	num_tuples = tups_vacuumed = nkeep = nunused = 0;
 
+	/*
+	 * Grab the index vacuum LSN. We should do this before calculating the
+	 * number of blocks in the relation and starting the heap scan.
+	 * If don't grab the index vacuum LSN now and rather defer it to just
+	 * before the index vacuum starts, we will miss the dead line pointers that
+	 * get generated after we scanned a page, but before calling index vacuum.
+	 * Similarly, we will miss the dead line pointers generated in the new
+	 * pages added to the end of the relation.
+	 *
+	 * Note: By grabing the LSN here, we might be setting index vacuum LSN a bit
+	 * conservatively. Similarlym GetInsertRecPtr() may return a slightly old
+	 * LSN, but that does not create any correctness issue either.
+	 */
+	start_lsn_index_vacuum = GetInsertRecPtr();
+
 	indstats = (IndexBulkDeleteResult **)
 		palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
 
@@ -432,8 +450,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				lazy_vacuum_index(Irel[i],
 								  &indstats[i],
 								  vacrelstats);
-			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -528,7 +544,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 * We count tuples removed by the pruning step as removed by VACUUM.
 		 */
 		tups_vacuumed += heap_page_prune(onerel, buf, OldestXmin, false,
-										 &vacrelstats->latestRemovedXid);
+										 &vacrelstats->latestRemovedXid,
+										 start_lsn_index_vacuum);
 
 		/*
 		 * Now scan the page to collect vacuumable items and check for tuples
@@ -712,24 +729,13 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			}
 		}
 
+		vacuumed_pages++;
+
 		/*
-		 * If there are no indexes then we can vacuum the page right now
-		 * instead of doing a second scan.
+		 * If there are no indexes, we don't need to remember the dead tuples
 		 */
-		if (nindexes == 0 &&
-			vacrelstats->num_dead_tuples > 0)
-		{
-			/* Remove tuples from heap */
-			lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
-
-			/*
-			 * Forget the now-vacuumed tuples, and press on, but be careful
-			 * not to reset latestRemovedXid since we want that value to be
-			 * valid.
-			 */
+		if (nindexes == 0)
 			vacrelstats->num_dead_tuples = 0;
-			vacuumed_pages++;
-		}
 
 		freespace = PageGetHeapFreeSpace(page);
 
@@ -792,9 +798,11 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 * its post-compaction free space.	If not, then we're done with this
 		 * page, so remember its free space as-is.	(This path will always be
 		 * taken if there are no indexes.)
+		 *
+		 * For single pass vacuum, we must record the free space now since
+		 * there is no second pass.
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+		RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
 	/* save stats for use later */
@@ -819,8 +827,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			lazy_vacuum_index(Irel[i],
 							  &indstats[i],
 							  vacrelstats);
-		/* Remove tuples from heap */
-		lazy_vacuum_heap(onerel, vacrelstats);
 		vacrelstats->num_index_scans++;
 	}
 
@@ -831,6 +837,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		vmbuffer = InvalidBuffer;
 	}
 
+	if (!XLogRecPtrIsInvalid(start_lsn_index_vacuum))
+		vacrelstats->indexvaclsn = start_lsn_index_vacuum;
+
 	/* Do post-vacuum cleanup and statistics update for each index */
 	for (i = 0; i < nindexes; i++)
 		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
@@ -857,118 +866,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 					   pg_rusage_show(&ru0))));
 }
 
-
-/*
- *	lazy_vacuum_heap() -- second pass over the heap
- *
- *		This routine marks dead tuples as unused and compacts out free
- *		space on their pages.  Pages not having dead tuples recorded from
- *		lazy_scan_heap are not visited at all.
- *
- * Note: the reason for doing this as a second pass is we cannot remove
- * the tuples until we've removed their index entries, and we want to
- * process index entry removal in batches as large as possible.
- */
-static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
-{
-	int			tupindex;
-	int			npages;
-	PGRUsage	ru0;
-
-	pg_rusage_init(&ru0);
-	npages = 0;
-
-	tupindex = 0;
-	while (tupindex < vacrelstats->num_dead_tuples)
-	{
-		BlockNumber tblk;
-		Buffer		buf;
-		Page		page;
-		Size		freespace;
-
-		vacuum_delay_point();
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
-								 vac_strategy);
-		LockBufferForCleanup(buf);
-		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
-
-		/* Now that we've compacted the page, record its available space */
-		page = BufferGetPage(buf);
-		freespace = PageGetHeapFreeSpace(page);
-
-		UnlockReleaseBuffer(buf);
-		RecordPageWithFreeSpace(onerel, tblk, freespace);
-		npages++;
-	}
-
-	ereport(elevel,
-			(errmsg("\"%s\": removed %d row versions in %d pages",
-					RelationGetRelationName(onerel),
-					tupindex, npages),
-			 errdetail("%s.",
-					   pg_rusage_show(&ru0))));
-}
-
-/*
- *	lazy_vacuum_page() -- free dead tuples on a page
- *					 and repair its fragmentation.
- *
- * Caller must hold pin and buffer cleanup lock on the buffer.
- *
- * tupindex is the index in vacrelstats->dead_tuples of the first dead
- * tuple for this page.  We assume the rest follow sequentially.
- * The return value is the first tupindex after the tuples of this page.
- */
-static int
-lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
-				 int tupindex, LVRelStats *vacrelstats)
-{
-	Page		page = BufferGetPage(buffer);
-	OffsetNumber unused[MaxOffsetNumber];
-	int			uncnt = 0;
-
-	START_CRIT_SECTION();
-
-	for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
-	{
-		BlockNumber tblk;
-		OffsetNumber toff;
-		ItemId		itemid;
-
-		tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
-		itemid = PageGetItemId(page, toff);
-		ItemIdSetUnused(itemid);
-		unused[uncnt++] = toff;
-	}
-
-	PageRepairFragmentation(page);
-
-	MarkBufferDirty(buffer);
-
-	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
-	{
-		XLogRecPtr	recptr;
-
-		recptr = log_heap_clean(onerel, buffer,
-								NULL, 0, NULL, 0,
-								unused, uncnt,
-								vacrelstats->latestRemovedXid);
-		PageSetLSN(page, recptr);
-		PageSetTLI(page, ThisTimeLineID);
-	}
-
-	END_CRIT_SECTION();
-
-	return tupindex;
-}
-
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
  *
@@ -1195,9 +1092,10 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 			 * Note: any non-unused item should be taken as a reason to keep
 			 * this page.  We formerly thought that DEAD tuples could be
 			 * thrown away, but that's not so, because we'd not have cleaned
-			 * out their index entries.
+			 * out their index entries. We can throw away DEAD VACUUMED tuples
+			 * though since their index entries must have been removed by now
 			 */
-			if (ItemIdIsUsed(itemid))
+			if (ItemIdIsUsed(itemid) && !ItemIdIsDeadVacuumed(itemid))
 			{
 				hastup = true;
 				break;			/* can stop scanning */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 6bd3812..b36c24d 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -350,7 +350,12 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
 /*
  * PageRepairFragmentation
  *
- * Frees fragmented space on a page.
+ * Frees fragmented space on a page, optionally allocating space for storing a
+ * vacuum LSN as part of the special area (and this applies only to heap pages).
+ * If there is not enough free space to store the vacuum LSN, just keep the
+ * existing special area unchanged. The caller should be aware of the
+ * possibility and make appropriate choice
+ *
  * It doesn't remove unused line pointers! Please don't change this.
  *
  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
@@ -358,11 +363,13 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
  */
 void
-PageRepairFragmentation(Page page)
+PageRepairFragmentation(Page page, bool need_vaclsn)
 {
 	Offset		pd_lower = ((PageHeader) page)->pd_lower;
 	Offset		pd_upper = ((PageHeader) page)->pd_upper;
 	Offset		pd_special = ((PageHeader) page)->pd_special;
+	Offset		new_pd_special;
+	Size		specialSize = need_vaclsn ? MAXALIGN(sizeof (XLogRecPtr)) : 0;
 	itemIdSort	itemidbase,
 				itemidptr;
 	ItemId		lp;
@@ -390,6 +397,8 @@ PageRepairFragmentation(Page page)
 				 errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
 						pd_lower, pd_upper, pd_special)));
 
+	new_pd_special = BLCKSZ - specialSize;
+
 	nline = PageGetMaxOffsetNumber(page);
 	nunused = nstorage = 0;
 	for (i = FirstOffsetNumber; i <= nline; i++)
@@ -411,10 +420,12 @@ PageRepairFragmentation(Page page)
 	if (nstorage == 0)
 	{
 		/* Page is completely empty, so just reset it quickly */
-		((PageHeader) page)->pd_upper = pd_special;
+		((PageHeader) page)->pd_upper = ((PageHeader) page)->pd_special = new_pd_special;
 	}
 	else
 	{							/* nstorage != 0 */
+		Offset adjust;
+
 		/* Need to compact the page the hard way */
 		itemidbase = (itemIdSort) palloc(sizeof(itemIdSortData) * nstorage);
 		itemidptr = itemidbase;
@@ -444,10 +455,24 @@ PageRepairFragmentation(Page page)
 			   errmsg("corrupted item lengths: total %u, available space %u",
 					  (unsigned int) totallen, pd_special - pd_lower)));
 
+		/* 
+		 * If there is not enough space to increase the special area, just
+		 * preserve the existing special area. The caller should better check
+		 * if the special area has been increased or not before writing
+		 * anything to it
+		 */
+		if (totallen > (Size) (new_pd_special - pd_lower))
+		{
+			new_pd_special = pd_special;
+			need_vaclsn = false;
+		}
+
 		/* sort itemIdSortData array into decreasing itemoff order */
 		qsort((char *) itemidbase, nstorage, sizeof(itemIdSortData),
 			  itemoffcompare);
 
+		adjust = pd_special - new_pd_special;
+
 		/* compactify page */
 		upper = pd_special;
 
@@ -458,14 +483,25 @@ PageRepairFragmentation(Page page)
 			memmove((char *) page + upper,
 					(char *) page + itemidptr->itemoff,
 					itemidptr->alignedlen);
-			lp->lp_off = upper;
+			lp->lp_off = upper - adjust;
 		}
 
-		((PageHeader) page)->pd_upper = upper;
+		if (adjust != 0)
+			memmove((char *) page + upper - adjust,
+				(char *) page + upper, pd_special - upper);
+
+		((PageHeader) page)->pd_upper = upper - adjust;
+		((PageHeader) page)->pd_special = new_pd_special;
 
 		pfree(itemidbase);
 	}
 
+	/* Record that the page now contains vacuum lsn */
+	if (need_vaclsn)
+		PageSetHasVacuumLSN(page);
+	else
+		PageClearHasVacuumLSN(page);
+
 	/* Set hint bit for PageAddItem */
 	if (nunused > 0)
 		PageSetHasFreeLinePointers(page);
@@ -828,3 +864,43 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
 
 	pfree(itemidbase);
 }
+
+/*
+ * Get the LSN of the most recent retail-vacuum operation on the page that
+ * created a dead line pointer on the page. If there are no dead line pointers,
+ * just return an invalid XLogRecPtr
+ */
+XLogRecPtr
+PageGetVacuumLSN(Page page)
+{
+	XLogRecPtr vaclsn;
+
+	/* Set the LSN to invalid */
+	vaclsn.xrecoff = 0;
+
+	if (PageHasVacuumLSN(page))
+	{
+		Assert(PageGetSpecialSize(page) == MAXALIGN(sizeof (XLogRecPtr)));
+		vaclsn = *((XLogRecPtr *) PageGetSpecialPointer(page));
+	}
+
+	return vaclsn;
+}
+
+/*
+ * Set the LSN of the recent retail-vacuum operation that generated a dead line
+ * pointer on the page, overwriting any existing value. The function is
+ * called while holding a buffer clean-up lock on the page, but the caller
+ * should have already made room for the LSN. If there is no special space
+ * available, just return without writing anything.
+ */
+void
+PageSetVacuumLSN(Page page, XLogRecPtr vaclsn)
+{
+	Assert(!XLogRecPtrIsInvalid(vaclsn));
+
+	if (PageGetSpecialSize(page) != MAXALIGN(sizeof (XLogRecPtr)))
+		return;
+
+	*((XLogRecPtr *) PageGetSpecialPointer(page)) = vaclsn;
+}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4dbc393..c44db1a 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -132,7 +132,8 @@ extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
 			   OffsetNumber *nowdead, int ndead,
 			   OffsetNumber *nowunused, int nunused,
-			   TransactionId latestRemovedXid);
+			   TransactionId latestRemovedXid,
+			   bool need_vaclsn, XLogRecPtr indexvaclsn);
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
 				TransactionId cutoff_xid,
 				OffsetNumber *offsets, int offcnt);
@@ -144,11 +145,13 @@ extern void heap_page_prune_opt(Relation relation, Buffer buffer,
 					TransactionId OldestXmin);
 extern int heap_page_prune(Relation relation, Buffer buffer,
 				TransactionId OldestXmin,
-				bool report_stats, TransactionId *latestRemovedXid);
+				bool report_stats, TransactionId *latestRemovedXid,
+				XLogRecPtr indexvaclsn);
 extern void heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
-						OffsetNumber *nowunused, int nunused);
+						OffsetNumber *nowunused, int nunused,
+						bool need_vaclsn, XLogRecPtr indexvaclsn);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/syncscan.c */
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index c147707..08633e5 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/tupmacs.h"
+#include "access/xlogdefs.h"
 #include "storage/itemptr.h"
 #include "storage/relfilenode.h"
 
@@ -689,10 +690,12 @@ typedef struct xl_heap_clean
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
+	bool		hasvaclsn;
+	XLogRecPtr	indexvaclsn;
 	/* OFFSET NUMBERS FOLLOW */
 } xl_heap_clean;
 
-#define SizeOfHeapClean (offsetof(xl_heap_clean, ndead) + sizeof(uint16))
+#define SizeOfHeapClean (offsetof(xl_heap_clean, indexvaclsn) + sizeof(XLogRecPtr))
 
 /*
  * Cleanup_info is required in some cases during a lazy VACUUM.
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index ffcce3c..008680e 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -65,6 +65,8 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 	bool		relhastriggers; /* has (or has had) any TRIGGERs */
 	bool		relhassubclass; /* has (or has had) derived classes */
 	TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */
+	int4	    relindxvacxlogid; /* start LSN of the last successful index vacuum */
+	int4		relindxvacxlogoff;
 
 	/*
 	 * VARIABLE LENGTH FIELDS start here.  These fields may be NULL, too.
@@ -78,7 +80,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
 
 /* Size of fixed part of pg_class tuples, not counting var-length fields */
 #define CLASS_TUPLE_SIZE \
-	 (offsetof(FormData_pg_class,relfrozenxid) + sizeof(TransactionId))
+	 (offsetof(FormData_pg_class,relindxvacxlogoff) + sizeof(int4))
 
 /* ----------------
  *		Form_pg_class corresponds to a pointer to a tuple with
@@ -92,7 +94,7 @@ typedef FormData_pg_class *Form_pg_class;
  * ----------------
  */
 
-#define Natts_pg_class					26
+#define Natts_pg_class					28
 #define Anum_pg_class_relname			1
 #define Anum_pg_class_relnamespace		2
 #define Anum_pg_class_reltype			3
@@ -117,8 +119,10 @@ typedef FormData_pg_class *Form_pg_class;
 #define Anum_pg_class_relhastriggers	22
 #define Anum_pg_class_relhassubclass	23
 #define Anum_pg_class_relfrozenxid		24
-#define Anum_pg_class_relacl			25
-#define Anum_pg_class_reloptions		26
+#define Anum_pg_class_relindxvacxlogid	25
+#define Anum_pg_class_relindxvacxlogoff	26
+#define Anum_pg_class_relacl			27
+#define Anum_pg_class_reloptions		28
 
 /* ----------------
  *		initial contents of pg_class
@@ -130,13 +134,13 @@ typedef FormData_pg_class *Form_pg_class;
  */
 
 /* Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId */
-DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1247 (  pg_type		PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f 3 0 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 20 0 f f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1249 (  pg_attribute	PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 20 0 f f f f f 3 0 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 25 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1255 (  pg_proc		PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 25 0 t f f f f 3 0 0 _null_ _null_ ));
 DESCR("");
-DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 26 0 t f f f f 3 _null_ _null_ ));
+DATA(insert OID = 1259 (  pg_class		PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f 3 0 0 _null_ _null_ ));
 DESCR("");
 
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index cfbe0c4..4f3b119 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -151,6 +151,7 @@ extern void vac_update_relstats(Relation relation,
 					double num_tuples,
 					bool hasindex,
 					TransactionId frozenxid);
+extern void vac_update_indexvaclsn(Relation relation, XLogRecPtr indexvaclsn);
 extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age,
 					  bool sharedRel,
 					  TransactionId *oldestXmin,
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 42d6b10..49b6fa4 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -154,8 +154,12 @@ typedef PageHeaderData *PageHeader;
 										 * tuple? */
 #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
 										 * everyone */
+#define PD_HAS_VACUUM_LSN	0x0008		/* page has dead line pointers and the
+										 * lsn of the operation that created
+										 * the most recent dead line pointer is
+										 * recorded in the page special area */
 
-#define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
+#define PD_VALID_FLAG_BITS	0x000f		/* OR of all valid pd_flags bits */
 
 /*
  * Page layout version number 0 is for pre-7.3 Postgres releases.
@@ -345,6 +349,13 @@ typedef PageHeaderData *PageHeader;
 #define PageClearAllVisible(page) \
 	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
 
+#define PageHasVacuumLSN(page) \
+	(((PageHeader) (page))->pd_flags & PD_HAS_VACUUM_LSN)
+#define PageSetHasVacuumLSN(page) \
+	(((PageHeader) (page))->pd_flags |= PD_HAS_VACUUM_LSN)
+#define PageClearHasVacuumLSN(page) \
+	(((PageHeader) (page))->pd_flags &= ~PD_HAS_VACUUM_LSN)
+
 #define PageIsPrunable(page, oldestxmin) \
 ( \
 	AssertMacro(TransactionIdIsNormal(oldestxmin)), \
@@ -375,11 +386,13 @@ extern Page PageGetTempPage(Page page);
 extern Page PageGetTempPageCopy(Page page);
 extern Page PageGetTempPageCopySpecial(Page page);
 extern void PageRestoreTempPage(Page tempPage, Page oldPage);
-extern void PageRepairFragmentation(Page page);
+extern void PageRepairFragmentation(Page page, bool need_vaclsn);
 extern Size PageGetFreeSpace(Page page);
 extern Size PageGetExactFreeSpace(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
 extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
+extern XLogRecPtr PageGetVacuumLSN(Page page);
+extern void PageSetVacuumLSN(Page page, XLogRecPtr vaclsn);
 
 #endif   /* BUFPAGE_H */
diff --git a/src/include/storage/itemid.h b/src/include/storage/itemid.h
index 961d2c2..0ae1298 100644
--- a/src/include/storage/itemid.h
+++ b/src/include/storage/itemid.h
@@ -113,6 +113,13 @@ typedef uint16 ItemLength;
 	((itemId)->lp_flags == LP_DEAD)
 
 /*
+ * ItemIdIsDeadVacuumed
+ *		True iff item identifier is in state DEAD_VACUUMED.
+ */
+#define ItemIdIsDeadVacuumed(itemId) \
+	(((itemId)->lp_flags == LP_DEAD) && ((itemId)->lp_off == 1))
+
+/*
  * ItemIdHasStorage
  *		True iff item identifier has associated storage.
  */
@@ -168,6 +175,19 @@ typedef uint16 ItemLength;
 )
 
 /*
+ * ItemIdSetDeadVacuumed
+ *		Set the item identifier to be DEAD_VACUUMED, with no storage.
+ *		Beware of multiple evaluations of itemId!
+ */
+#define ItemIdSetDeadVacuumed(itemId) \
+( \
+	(itemId)->lp_flags = LP_DEAD, \
+	(itemId)->lp_off = 1, \
+	(itemId)->lp_len = 0 \
+)
+
+
+/*
  * ItemIdMarkDead
  *		Set the item identifier to be DEAD, keeping its existing storage.
  *
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to