On Mon, 2013-04-08 at 09:19 +0100, Simon Riggs wrote:

> Applied, with this as the only code change.
> 
> 
> Thanks everybody for good research and coding and fast testing.
> 
> 
> We're in good shape now.

Thank you.

I have attached two more patches:

1. I believe that the issue I brought up at the end of this email:

http://www.postgresql.org/message-id/1365035537.7580.380.camel@sussancws0025

is a real issue. In lazy_vacuum_page(), the following sequence can
happen when checksums are on:

   a. PageSetAllVisible
   b. Pass heap page to visibilitymap_set
   c. visibilitymap_set logs the heap page and sets the LSN
   d. MarkBufferDirty

If a checkpoint happens between (c) and (d), then we have a problem. The
fix is easy: just mark the heap buffer dirty first. There's another call
site that looks like a potential problem, but I don't think it is. I
simplified the code there to make it (hopefully) more clearly correct.

2. A cleanup patch to pass the buffer_std flag down through
MarkBufferDirtyHint. This is a matter of preference and purely cosmetic,
so it might not be wanted. The reason I thought it was useful is that a
future caller that sets a hint on a non-standard buffer might easily
miss the assumption that we have a standard buffer.

Regards,
        Jeff Davis
*** a/src/backend/access/hash/hash.c
--- b/src/backend/access/hash/hash.c
***************
*** 287,293 **** hashgettuple(PG_FUNCTION_ARGS)
  			/*
  			 * Since this can be redone later if needed, mark as a hint.
  			 */
! 			MarkBufferDirtyHint(buf);
  		}
  
  		/*
--- 287,293 ----
  			/*
  			 * Since this can be redone later if needed, mark as a hint.
  			 */
! 			MarkBufferDirtyHint(buf, true);
  		}
  
  		/*
*** a/src/backend/access/heap/pruneheap.c
--- b/src/backend/access/heap/pruneheap.c
***************
*** 262,268 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
  		{
  			((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
  			PageClearFull(page);
! 			MarkBufferDirtyHint(buffer);
  		}
  	}
  
--- 262,268 ----
  		{
  			((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
  			PageClearFull(page);
! 			MarkBufferDirtyHint(buffer, true);
  		}
  	}
  
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 413,421 **** _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  					 * crucial. Be sure to mark the proper buffer dirty.
  					 */
  					if (nbuf != InvalidBuffer)
! 						MarkBufferDirtyHint(nbuf);
  					else
! 						MarkBufferDirtyHint(buf);
  				}
  			}
  		}
--- 413,421 ----
  					 * crucial. Be sure to mark the proper buffer dirty.
  					 */
  					if (nbuf != InvalidBuffer)
! 						MarkBufferDirtyHint(nbuf, true);
  					else
! 						MarkBufferDirtyHint(buf, true);
  				}
  			}
  		}
*** a/src/backend/access/nbtree/nbtree.c
--- b/src/backend/access/nbtree/nbtree.c
***************
*** 1052,1058 **** restart:
  				opaque->btpo_cycleid == vstate->cycleid)
  			{
  				opaque->btpo_cycleid = 0;
! 				MarkBufferDirtyHint(buf);
  			}
  		}
  
--- 1052,1058 ----
  				opaque->btpo_cycleid == vstate->cycleid)
  			{
  				opaque->btpo_cycleid = 0;
! 				MarkBufferDirtyHint(buf, true);
  			}
  		}
  
*** a/src/backend/access/nbtree/nbtutils.c
--- b/src/backend/access/nbtree/nbtutils.c
***************
*** 1789,1795 **** _bt_killitems(IndexScanDesc scan, bool haveLock)
  	if (killedsomething)
  	{
  		opaque->btpo_flags |= BTP_HAS_GARBAGE;
! 		MarkBufferDirtyHint(so->currPos.buf);
  	}
  
  	if (!haveLock)
--- 1789,1795 ----
  	if (killedsomething)
  	{
  		opaque->btpo_flags |= BTP_HAS_GARBAGE;
! 		MarkBufferDirtyHint(so->currPos.buf, true);
  	}
  
  	if (!haveLock)
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 7661,7667 **** XLogRestorePoint(const char *rpName)
   * i.e. those for which buffer_std == true
   */
  XLogRecPtr
! XLogSaveBufferForHint(Buffer buffer)
  {
  	XLogRecPtr recptr = InvalidXLogRecPtr;
  	XLogRecPtr lsn;
--- 7661,7667 ----
   * i.e. those for which buffer_std == true
   */
  XLogRecPtr
! XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
  {
  	XLogRecPtr recptr = InvalidXLogRecPtr;
  	XLogRecPtr lsn;
***************
*** 7683,7689 **** XLogSaveBufferForHint(Buffer buffer)
  	 * We reuse and reset rdata for any actual WAL record insert.
  	 */
  	rdata[0].buffer = buffer;
! 	rdata[0].buffer_std = true;
  
  	/*
  	 * Check buffer while not holding an exclusive lock.
--- 7683,7689 ----
  	 * We reuse and reset rdata for any actual WAL record insert.
  	 */
  	rdata[0].buffer = buffer;
! 	rdata[0].buffer_std = buffer_std;
  
  	/*
  	 * Check buffer while not holding an exclusive lock.
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 1118,1124 **** read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
  		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
  		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
  		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
! 		MarkBufferDirtyHint(*buf);
  	}
  
  	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
--- 1118,1124 ----
  		HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
  		seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
  		seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
! 		MarkBufferDirtyHint(*buf, true);
  	}
  
  	seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 2582,2588 **** IncrBufferRefCount(Buffer buffer)
   *    (due to a race condition), so it cannot be used for important changes.
   */
  void
! MarkBufferDirtyHint(Buffer buffer)
  {
  	volatile BufferDesc *bufHdr;
  	Page	page = BufferGetPage(buffer);
--- 2582,2588 ----
   *    (due to a race condition), so it cannot be used for important changes.
   */
  void
! MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
  {
  	volatile BufferDesc *bufHdr;
  	Page	page = BufferGetPage(buffer);
***************
*** 2667,2673 **** MarkBufferDirtyHint(Buffer buffer)
  			 * rather than full transactionids.
  			 */
  			MyPgXact->delayChkpt = delayChkpt = true;
! 			lsn = XLogSaveBufferForHint(buffer);
  		}
  
  		LockBufHdr(bufHdr);
--- 2667,2673 ----
  			 * rather than full transactionids.
  			 */
  			MyPgXact->delayChkpt = delayChkpt = true;
! 			lsn = XLogSaveBufferForHint(buffer, buffer_std);
  		}
  
  		LockBufHdr(bufHdr);
*** a/src/backend/storage/freespace/freespace.c
--- b/src/backend/storage/freespace/freespace.c
***************
*** 216,222 **** XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
  		PageInit(page, BLCKSZ, 0);
  
  	if (fsm_set_avail(page, slot, new_cat))
! 		MarkBufferDirtyHint(buf);
  	UnlockReleaseBuffer(buf);
  }
  
--- 216,222 ----
  		PageInit(page, BLCKSZ, 0);
  
  	if (fsm_set_avail(page, slot, new_cat))
! 		MarkBufferDirtyHint(buf, true);
  	UnlockReleaseBuffer(buf);
  }
  
***************
*** 286,292 **** FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
  			return;				/* nothing to do; the FSM was already smaller */
  		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
! 		MarkBufferDirtyHint(buf);
  		UnlockReleaseBuffer(buf);
  
  		new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
--- 286,292 ----
  			return;				/* nothing to do; the FSM was already smaller */
  		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
! 		MarkBufferDirtyHint(buf, true);
  		UnlockReleaseBuffer(buf);
  
  		new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
***************
*** 619,625 **** fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
  	page = BufferGetPage(buf);
  
  	if (fsm_set_avail(page, slot, newValue))
! 		MarkBufferDirtyHint(buf);
  
  	if (minValue != 0)
  	{
--- 619,625 ----
  	page = BufferGetPage(buf);
  
  	if (fsm_set_avail(page, slot, newValue))
! 		MarkBufferDirtyHint(buf, true);
  
  	if (minValue != 0)
  	{
***************
*** 770,776 **** fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
  			{
  				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  				fsm_set_avail(BufferGetPage(buf), slot, child_avail);
! 				MarkBufferDirtyHint(buf);
  				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  			}
  		}
--- 770,776 ----
  			{
  				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
  				fsm_set_avail(BufferGetPage(buf), slot, child_avail);
! 				MarkBufferDirtyHint(buf, true);
  				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  			}
  		}
*** a/src/backend/storage/freespace/fsmpage.c
--- b/src/backend/storage/freespace/fsmpage.c
***************
*** 284,290 **** restart:
  				exclusive_lock_held = true;
  			}
  			fsm_rebuild_page(page);
! 			MarkBufferDirtyHint(buf);
  			goto restart;
  		}
  	}
--- 284,290 ----
  				exclusive_lock_held = true;
  			}
  			fsm_rebuild_page(page);
! 			MarkBufferDirtyHint(buf, true);
  			goto restart;
  		}
  	}
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 121,127 **** SetHintBits(HeapTupleHeader tuple, Buffer buffer,
  	}
  
  	tuple->t_infomask |= infomask;
! 	MarkBufferDirtyHint(buffer);
  }
  
  /*
--- 121,127 ----
  	}
  
  	tuple->t_infomask |= infomask;
! 	MarkBufferDirtyHint(buffer, true);
  }
  
  /*
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 267,273 **** extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
  extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(XLogSegNo segno);
  
! extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
  
  extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
  extern void XLogSetAsyncXactLSN(XLogRecPtr record);
--- 267,273 ----
  extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(XLogSegNo segno);
  
! extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
  
  extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
  extern void XLogSetAsyncXactLSN(XLogRecPtr record);
*** a/src/include/storage/bufmgr.h
--- b/src/include/storage/bufmgr.h
***************
*** 204,210 **** extern Size BufferShmemSize(void);
  extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
  			 ForkNumber *forknum, BlockNumber *blknum);
  
! extern void MarkBufferDirtyHint(Buffer buffer);
  
  extern void UnlockBuffers(void);
  extern void LockBuffer(Buffer buffer, int mode);
--- 204,210 ----
  extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
  			 ForkNumber *forknum, BlockNumber *blknum);
  
! extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
  
  extern void UnlockBuffers(void);
  extern void LockBuffer(Buffer buffer, int mode);
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 901,926 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  		freespace = PageGetHeapFreeSpace(page);
  
  		/* mark page all-visible, if appropriate */
! 		if (all_visible)
  		{
! 			if (!PageIsAllVisible(page))
! 			{
! 				PageSetAllVisible(page);
! 				MarkBufferDirty(buf);
! 				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
! 								  vmbuffer, visibility_cutoff_xid);
! 			}
! 			else if (!all_visible_according_to_vm)
! 			{
! 				/*
! 				 * It should never be the case that the visibility map page is
! 				 * set while the page-level bit is clear, but the reverse is
! 				 * allowed.  Set the visibility map bit as well so that we get
! 				 * back in sync.
! 				 */
! 				visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
! 								  vmbuffer, visibility_cutoff_xid);
! 			}
  		}
  
  		/*
--- 901,925 ----
  		freespace = PageGetHeapFreeSpace(page);
  
  		/* mark page all-visible, if appropriate */
! 		if (all_visible && !all_visible_according_to_vm)
  		{
! 			/*
! 			 * It should never be the case that the visibility map page is set
! 			 * while the page-level bit is clear, but the reverse is allowed
! 			 * (if checksums are not enabled).  Regardless, set the both bits
! 			 * so that we get back in sync.
! 			 *
! 			 * NB: If the heap page is all-visible but the VM bit is not set,
! 			 * we don't need to dirty the heap page.  However, if checksums are
! 			 * enabled, we do need to make sure that the heap page is dirtied
! 			 * before passing it to visibilitymap_set(), because it may be
! 			 * logged.  Given that this situation should only happen in rare
! 			 * cases after a crash, it is not worth optimizing.
! 			 */
! 			PageSetAllVisible(page);
! 			MarkBufferDirty(buf);
! 			visibilitymap_set(onerel, blkno, buf, InvalidXLogRecPtr,
! 							  vmbuffer, visibility_cutoff_xid);
  		}
  
  		/*
***************
*** 1146,1151 **** lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
--- 1145,1158 ----
  	PageRepairFragmentation(page);
  
  	/*
+ 	 * If checksums are enabled, visibilitymap_set() may log the heap page, so
+ 	 * we mark heap buffer dirty before calling visibilitmap_set().  We need to
+ 	 * mark the heap page dirty regardless, but the order only matters when
+ 	 * checksums are enabled.
+ 	 */
+ 	MarkBufferDirty(buffer);
+ 
+ 	/*
  	 * Now that we have removed the dead tuples from the page, once again check
  	 * if the page has become all-visible.
  	 */
***************
*** 1158,1165 **** lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  				visibility_cutoff_xid);
  	}
  
- 	MarkBufferDirty(buffer);
- 
  	/* XLOG stuff */
  	if (RelationNeedsWAL(onerel))
  	{
--- 1165,1170 ----
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to