On 08.11.2010 15:40, Heikki Linnakangas wrote:
Here's a first draft of this, using the inCommit flag as is. It works,
but suffers from starvation if you have a lot of concurrent
multi-WAL-record actions. I tested that by running INSERTs to a table
with tsvector field with a GiST index on it from five concurrent
sessions, and saw checkpoints regularly busy-waiting for over a minute.
To avoid that, we need something a little bit more complicated than a
boolean flag. I'm thinking of adding a counter beside the inCommit flag
that's incremented every time a new multi-WAL-record action begins, so
that the checkpoint process can distinguish between a new action that
was started after deciding the REDO pointer and an old one that's still
running.
(inCommit is a misnomer now, of course. Will need to find a better name..)
Here's a 2nd version, with an additional counter in PGPROC to avoid
starving checkpoint in the face of a constant stream e.g GiST inserts.
The new rule is that before you start a multi-WAL-record operation that
needs to be completed at end of recovery if you crash in the middle, you
call HoldCheckpoint(), and once you're finished, ResumeCheckpoint().
rm_safe_restartpoint() is gone.
This is a pre-existing bug, but given the lack of field reports and the
fact that it's pretty darn hard to run into this in real life, I'm
inclined to not backpatch this.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/gin/ginbtree.c
--- b/src/backend/access/gin/ginbtree.c
***************
*** 17,22 ****
--- 17,23 ----
#include "access/gin.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
+ #include "storage/proc.h"
#include "utils/rel.h"
/*
***************
*** 281,286 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 282,288 ----
Page page,
rpage,
lpage;
+ bool splitInProgress = false;
/* remember root BlockNumber */
while (parent)
***************
*** 318,324 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
freeGinBtreeStack(stack);
! return;
}
else
{
--- 320,326 ----
freeGinBtreeStack(stack);
! break; /* don't need to recurse to parent */
}
else
{
***************
*** 326,331 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 328,344 ----
Page newlpage;
/*
+ * Hold off checkpoints until we've finished this split by
+ * inserting the parent pointer. Replay might miss the incomplete
+ * split otherwise.
+ */
+ if (!!btree->index->rd_istemp)
+ {
+ splitInProgress = true;
+ HoldCheckpoint();
+ }
+
+ /*
* newlpage is a pointer to memory page, it doesn't associate with
* buffer, stack->buffer should be untouched
*/
***************
*** 402,408 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
buildStats->nEntryPages++;
}
! return;
}
else
{
--- 415,421 ----
buildStats->nEntryPages++;
}
! break; /* don't need to recurse to parent */
}
else
{
***************
*** 472,475 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 485,495 ----
pfree(stack);
stack = parent;
}
+
+ /*
+ * If we had to split a page, let checkpointer know that we're now
+ * finished with it.
+ */
+ if (splitInProgress)
+ ResumeCheckpoint();
}
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 866,876 **** gin_xlog_cleanup(void)
MemoryContextDelete(opCtx);
incomplete_splits = NIL;
}
-
- bool
- gin_safe_restartpoint(void)
- {
- if (incomplete_splits)
- return false;
- return true;
- }
--- 866,868 ----
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 20,25 ****
--- 20,26 ----
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
+ #include "storage/proc.h"
#include "utils/memutils.h"
const XLogRecPtr XLogRecPtrForTemp = {1, 1};
***************
*** 272,283 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
--- 273,290 ----
state.r = r;
state.key = itup->t_tid;
state.needInsertComplete = true;
+ /* Hold off checkpoints until we've completed this insertion. */
+ if (!r->rd_istemp)
+ HoldCheckpoint();
state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
state.stack->blkno = GIST_ROOT_BLKNO;
gistfindleaf(&state, giststate);
gistmakedeal(&state, giststate);
+
+ if (!r->rd_istemp)
+ ResumeCheckpoint();
}
static bool
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 838,851 **** gist_xlog_cleanup(void)
MemoryContextDelete(insertCtx);
}
- bool
- gist_safe_restartpoint(void)
- {
- if (incomplete_inserts)
- return false;
- return true;
- }
-
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
--- 838,843 ----
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 21,26 ****
--- 21,27 ----
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
+ #include "storage/proc.h"
#include "utils/inval.h"
#include "utils/tqual.h"
***************
*** 63,69 **** static void _bt_insertonpg(Relation rel, Buffer buf,
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
! bool split_only_page);
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz,
IndexTuple newitem, bool newitemonleft);
--- 64,71 ----
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
! bool split_only_page,
! bool isleaf);
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz,
IndexTuple newitem, bool newitemonleft);
***************
*** 176,182 **** top:
{
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
! _bt_insertonpg(rel, buf, stack, itup, offset, false);
}
else
{
--- 178,184 ----
{
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
! _bt_insertonpg(rel, buf, stack, itup, offset, false, true);
}
else
{
***************
*** 660,666 **** _bt_insertonpg(Relation rel,
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
! bool split_only_page)
{
Page page;
BTPageOpaque lpageop;
--- 662,669 ----
BTStack stack,
IndexTuple itup,
OffsetNumber newitemoff,
! bool split_only_page,
! bool isleaf)
{
Page page;
BTPageOpaque lpageop;
***************
*** 714,719 **** _bt_insertonpg(Relation rel,
--- 717,726 ----
*----------
*/
_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
+
+ /* Finish the "checkpoint critical section" started in _bt_split() */
+ if (!rel->rd_istemp && isleaf)
+ ResumeCheckpoint();
}
else
{
***************
*** 870,875 **** _bt_insertonpg(Relation rel,
--- 877,886 ----
*
* Returns the new right sibling of buf, pinned and write-locked.
* The pin and lock on buf are maintained.
+ *
+ * If it's not a temporary index, _bt_split() calls HoldCheckpoint(),
+ * and the caller is responsible for calling ResumeCheckpoint() after
+ * inserting the parent pointer.
*/
static Buffer
_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
***************
*** 1139,1144 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1150,1167 ----
START_CRIT_SECTION();
/*
+ * Hold off checkpoints until we've inserted the parent pointer. Without
+ * this, it would be possible for the checkpoint to set REDO after the
+ * split WAL record and quickly finish the checkpoint before the insertion
+ * of the parent pointer has been WAL-logged. Incomplete splits are
+ * normally completed at the end of recovery, but if there's no trace of
+ * the split between the redo pointer and checkpoint record, recovery
+ * would not know that the split is incomplete.
+ */
+ if (!rel->rd_istemp)
+ HoldCheckpoint();
+
+ /*
* By here, the original data page has been split into two new halves, and
* these are correct. The algorithm requires that the left page never
* move during a split, so we copy the new left page back on top of the
***************
*** 1683,1689 **** _bt_insert_parent(Relation rel,
/* Recursively update the parent */
_bt_insertonpg(rel, pbuf, stack->bts_parent,
new_item, stack->bts_offset + 1,
! is_only);
/* be tidy */
pfree(new_item);
--- 1706,1712 ----
/* Recursively update the parent */
_bt_insertonpg(rel, pbuf, stack->bts_parent,
new_item, stack->bts_offset + 1,
! is_only, false);
/* be tidy */
pfree(new_item);
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 1257,1267 **** btree_xlog_cleanup(void)
}
incomplete_actions = NIL;
}
-
- bool
- btree_safe_restartpoint(void)
- {
- if (incomplete_actions)
- return false;
- return true;
- }
--- 1257,1259 ----
*** a/src/backend/access/transam/README
--- b/src/backend/access/transam/README
***************
*** 542,547 **** replay code has to do the insertion on its own to restore the index to
--- 542,560 ----
consistency. Such insertions occur after WAL is operational, so they can
and should write WAL records for the additional generated actions.
+ Finishing incomplete actions at the end of recovery only works if the recovery
+ sees the WAL record starting the action. Because of that, you need to ensure
+ that a checkpoint doesn't happen in the middle of such an operation. To be
+ precise, a checkpoint mustn't begin and finish so that there is a multi-record
+ WAL action in progress, but no WAL record part of the action lies between the
+ REDO pointer and the checkpoint record. Recovery from such a checkpoint would
+ not see any evidence of the still incomplete multi-record action, so it would
+ not know that the operation needs to be finished at the end of recovery. To
+ prevent that, you must call HoldCheckpoint() before writing the first WAL
+ record of the operation, and ResumeCheckpoint() after it's finished. The
+ checkpoint code checks for that after deciding the REDO pointer, waiting until
+ all the operations that were in-progress when the checkpoint started have
+ finished, before it writes the checkpoint record.
Write-Ahead Logging for Filesystem Actions
------------------------------------------
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 26,45 ****
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
! {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},
! {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL},
! {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL},
! {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL},
! {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},
! {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
! {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
! {"RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL},
! {"Standby", standby_redo, standby_desc, NULL, NULL, NULL},
! {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
! {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
! {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
! {"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
! {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
! {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
};
--- 26,45 ----
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
! {"XLOG", xlog_redo, xlog_desc, NULL, NULL},
! {"Transaction", xact_redo, xact_desc, NULL, NULL},
! {"Storage", smgr_redo, smgr_desc, NULL, NULL},
! {"CLOG", clog_redo, clog_desc, NULL, NULL},
! {"Database", dbase_redo, dbase_desc, NULL, NULL},
! {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL},
! {"MultiXact", multixact_redo, multixact_desc, NULL, NULL},
! {"RelMap", relmap_redo, relmap_desc, NULL, NULL},
! {"Standby", standby_redo, standby_desc, NULL, NULL},
! {"Heap2", heap2_redo, heap2_desc, NULL, NULL},
! {"Heap", heap_redo, heap_desc, NULL, NULL},
! {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
! {"Hash", hash_redo, hash_desc, NULL, NULL},
! {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup},
! {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
! {"Sequence", seq_redo, seq_desc, NULL, NULL}
};
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 313,319 **** MarkAsPreparing(TransactionId xid, const char *gid,
gxact->proc.backendId = InvalidBackendId;
gxact->proc.databaseId = databaseid;
gxact->proc.roleId = owner;
! gxact->proc.inCommit = false;
gxact->proc.vacuumFlags = 0;
gxact->proc.lwWaiting = false;
gxact->proc.lwExclusive = false;
--- 313,320 ----
gxact->proc.backendId = InvalidBackendId;
gxact->proc.databaseId = databaseid;
gxact->proc.roleId = owner;
! gxact->proc.holdCheckpoint = false;
! gxact->proc.chkptHoldCounter = 0;
gxact->proc.vacuumFlags = 0;
gxact->proc.lwWaiting = false;
gxact->proc.lwExclusive = false;
***************
*** 1018,1024 **** EndPrepare(GlobalTransaction gxact)
*/
START_CRIT_SECTION();
! MyProc->inCommit = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head);
--- 1019,1025 ----
*/
START_CRIT_SECTION();
! MyProc->holdCheckpoint = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head);
***************
*** 1066,1072 **** EndPrepare(GlobalTransaction gxact)
* checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing.
*/
! MyProc->inCommit = false;
END_CRIT_SECTION();
--- 1067,1073 ----
* checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing.
*/
! MyProc->holdCheckpoint = false;
END_CRIT_SECTION();
***************
*** 1959,1965 **** RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
! MyProc->inCommit = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
--- 1960,1966 ----
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
! MyProc->holdCheckpoint = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
***************
*** 2024,2030 **** RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
! MyProc->inCommit = false;
END_CRIT_SECTION();
}
--- 2025,2031 ----
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
! MyProc->holdCheckpoint = false;
END_CRIT_SECTION();
}
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 968,975 **** RecordTransactionCommit(void)
xlrec.tsId = MyDatabaseTableSpace;
/*
! * Mark ourselves as within our "commit critical section". This
! * forces any concurrent checkpoint to wait until we've updated
* pg_clog. Without this, it is possible for the checkpoint to set
* REDO after the XLOG record but fail to flush the pg_clog update to
* disk, leading to loss of the transaction commit if the system
--- 968,974 ----
xlrec.tsId = MyDatabaseTableSpace;
/*
! * Hold off any concurrent checkpoints until we've updated
* pg_clog. Without this, it is possible for the checkpoint to set
* REDO after the XLOG record but fail to flush the pg_clog update to
* disk, leading to loss of the transaction commit if the system
***************
*** 979,991 **** RecordTransactionCommit(void)
* RecordTransactionAbort. That's because loss of a transaction abort
* is noncritical; the presumption would be that it aborted, anyway.
*
! * It's safe to change the inCommit flag of our own backend without
! * holding the ProcArrayLock, since we're the only one modifying it.
! * This makes checkpoint's determination of which xacts are inCommit a
! * bit fuzzy, but it doesn't matter.
*/
START_CRIT_SECTION();
! MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp();
xlrec.xact_time = xactStopTimestamp;
--- 978,992 ----
* RecordTransactionAbort. That's because loss of a transaction abort
* is noncritical; the presumption would be that it aborted, anyway.
*
! * We don't bother bumping chkptHoldCounter like HoldCheckpoint()
! * does, as the transaction is about to end soon anyway. At worst it
! * means that checkpoint thinks that we're still in some previous
! * multi-record operation that happened earlier in this transaction,
! * and has to wait a bit longer than necessary. Better to avoid any
! * unnecessary overhead in this performance-critical path.
*/
START_CRIT_SECTION();
! MyProc->holdCheckpoint = true;
SetCurrentTransactionStopTimestamp();
xlrec.xact_time = xactStopTimestamp;
***************
*** 1100,1106 **** RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
! MyProc->inCommit = false;
END_CRIT_SECTION();
}
--- 1101,1107 ----
*/
if (markXidCommitted)
{
! MyProc->holdCheckpoint = false;
END_CRIT_SECTION();
}
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 7111,7118 **** CreateCheckPoint(int flags)
uint32 freespace;
uint32 _logId;
uint32 _logSeg;
- TransactionId *inCommitXids;
- int nInCommit;
/*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
--- 7111,7116 ----
***************
*** 7299,7313 **** CreateCheckPoint(int flags)
* and we will correctly flush the update below. So we cannot miss any
* xacts we need to wait for.
*/
! nInCommit = GetTransactionsInCommit(&inCommitXids);
! if (nInCommit > 0)
! {
! do
! {
! pg_usleep(10000L); /* wait for 10 msec */
! } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
! }
! pfree(inCommitXids);
/*
* Get the other info we need for the checkpoint record.
--- 7297,7303 ----
* and we will correctly flush the update below. So we cannot miss any
* xacts we need to wait for.
*/
! ProcArrayCheckpointBarrier();
/*
* Get the other info we need for the checkpoint record.
***************
*** 7554,7584 **** CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
static void
RecoveryRestartPoint(const CheckPoint *checkPoint)
{
- int rmid;
-
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
/*
- * Is it safe to checkpoint? We must ask each of the resource managers
- * whether they have any partial state information that might prevent a
- * correct restart from this point. If so, we skip this opportunity, but
- * return at the next checkpoint record for another try.
- */
- for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- {
- if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
- if (!(RmgrTable[rmid].rm_safe_restartpoint()))
- {
- elog(trace_recovery(DEBUG2), "RM %d not safe to record restart point at %X/%X",
- rmid,
- checkPoint->redo.xlogid,
- checkPoint->redo.xrecoff);
- return;
- }
- }
-
- /*
* Copy the checkpoint record to shared memory, so that bgwriter can use
* it the next time it wants to perform a restartpoint.
*/
--- 7544,7553 ----
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 367,373 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
proc->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
/* Clear the subtransaction-XID cache too while holding the lock */
--- 367,373 ----
proc->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->holdCheckpoint = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
/* Clear the subtransaction-XID cache too while holding the lock */
***************
*** 394,400 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
proc->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
Assert(proc->subxids.nxids == 0);
--- 394,400 ----
proc->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->holdCheckpoint = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
Assert(proc->subxids.nxids == 0);
***************
*** 427,433 **** ProcArrayClearTransaction(PGPROC *proc)
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->inCommit = false;
/* Clear the subtransaction-XID cache too */
proc->subxids.nxids = 0;
--- 427,433 ----
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! proc->holdCheckpoint = false;
/* Clear the subtransaction-XID cache too */
proc->subxids.nxids = 0;
***************
*** 1537,1569 **** GetRunningTransactionData(void)
}
/*
! * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
! *
! * Constructs an array of XIDs of transactions that are currently in commit
! * critical sections, as shown by having inCommit set in their PGPROC entries.
! *
! * *xids_p is set to a palloc'd array that should be freed by the caller.
! * The return value is the number of valid entries.
! *
! * Note that because backends set or clear inCommit without holding any lock,
! * the result is somewhat indeterminate, but we don't really care. Even in
! * a multiprocessor with delayed writes to shared memory, it should be certain
! * that setting of inCommit will propagate to shared memory when the backend
! * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
! * it's already inserted its commit record. Whether it takes a little while
! * for clearing of inCommit to propagate is unimportant for correctness.
*/
! int
! GetTransactionsInCommit(TransactionId **xids_p)
{
ProcArrayStruct *arrayP = procArray;
! TransactionId *xids;
int nxids;
int index;
! xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
nxids = 0;
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
--- 1537,1586 ----
}
/*
! * ProcArrayCheckpointBarrier -- Wait until all multi-record actions finish
! *
! * Waits until all backends finish any multi-WAL-record actions, or other
! * incomplete actions, so that it is safe to perform a checkpoint.
! * This is called in the checkpoint process after deciding REDO pointer,
! * but before writing the checkpoint record.
! *
! * We don't need to wait for any new multi-record actions that begin after
! * we've started waiting, they will be seen on recovery from the REDO
! * pointer.
! *
! * Note that because backends set or clear holdCheckpoints flag and
! * chkptHoldCounter without holding any lock, the result is somewhat
! * indeterminate, but we don't really care. Even in a multiprocessor with
! * delayed writes to shared memory, it should be certain that setting of
! * holdCheckpoints will propagate to shared memory when the backend takes the
! * WALInsertLock to write the first WAL record in the action, so we cannot
! * fail to see a multi-step action as holding checkpoints if it's already
! * inserted its first WAL record. Whether it takes a little while for
! * clearing of holdCheckpoints to propagate is unimportant for correctness.
! * Likewise we don't care much if the change of checkpointHoldCounter is seen
! * as atomic, we just care it it's the same or different as it was when we
! * started. By the time the backend takes the WALInsertLock, it should be
! * updated in shared memory, and until that we don't really need to wait for
! * the backend anyway.
*/
! void
! ProcArrayCheckpointBarrier(void)
{
ProcArrayStruct *arrayP = procArray;
! struct xid_barrier {
! TransactionId xid;
! int32 counter;
! } *xids;
int nxids;
int index;
! xids = (struct xid_barrier *) palloc(arrayP->maxProcs * sizeof(struct xid_barrier));
nxids = 0;
+ /*
+ * Take a snapshot of the chkptHoldCounter values of all backends
+ * currently holding off checkpoints.
+ */
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
***************
*** 1573,1633 **** GetTransactionsInCommit(TransactionId **xids_p)
/* Fetch xid just once - see GetNewTransactionId */
TransactionId pxid = proc->xid;
! if (proc->inCommit && TransactionIdIsValid(pxid))
! xids[nxids++] = pxid;
}
LWLockRelease(ProcArrayLock);
! *xids_p = xids;
! return nxids;
! }
!
! /*
! * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
! *
! * This is used with the results of GetTransactionsInCommit to see if any
! * of the specified XIDs are still in their commit critical sections.
! *
! * Note: this is O(N^2) in the number of xacts that are/were in commit, but
! * those numbers should be small enough for it not to be a problem.
! */
! bool
! HaveTransactionsInCommit(TransactionId *xids, int nxids)
! {
! bool result = false;
! ProcArrayStruct *arrayP = procArray;
! int index;
!
! LWLockAcquire(ProcArrayLock, LW_SHARED);
!
! for (index = 0; index < arrayP->numProcs; index++)
{
! volatile PGPROC *proc = arrayP->procs[index];
!
! /* Fetch xid just once - see GetNewTransactionId */
! TransactionId pxid = proc->xid;
!
! if (proc->inCommit && TransactionIdIsValid(pxid))
{
! int i;
! for (i = 0; i < nxids; i++)
{
! if (xids[i] == pxid)
{
! result = true;
! break;
}
}
! if (result)
! break;
! }
}
!
! LWLockRelease(ProcArrayLock);
!
! return result;
}
/*
--- 1590,1651 ----
/* Fetch xid just once - see GetNewTransactionId */
TransactionId pxid = proc->xid;
! if (proc->holdCheckpoint && TransactionIdIsValid(pxid))
! {
! xids[nxids].xid = pxid;
! xids[nxids].counter = proc->chkptHoldCounter;
! nxids++;
! }
}
LWLockRelease(ProcArrayLock);
! /*
! * Wait until all operations that were in-progress when we took the
! * snapshot have finished.
! *
! * Note: this is O(N^2) in the number of operations that are/were in
! * progress, but those numbers should be small enough for it not to be
! * a problem.
! */
! if (nxids > 0)
{
! bool stillHeld;
! do
{
! pg_usleep(10000L); /* wait for 10 msec */
!
! stillHeld = false;
! LWLockAcquire(ProcArrayLock, LW_SHARED);
! for (index = 0; index < arrayP->numProcs; index++)
{
! volatile PGPROC *proc = arrayP->procs[index];
!
! /* Fetch xid just once - see GetNewTransactionId */
! TransactionId pxid = proc->xid;
!
! if (proc->holdCheckpoint && TransactionIdIsValid(pxid))
{
! int i;
!
! for (i = 0; i < nxids; i++)
! {
! if (xids[i].xid == pxid)
! {
! if (xids[i].counter == proc->chkptHoldCounter)
! stillHeld = true;
! break;
! }
! }
! if (stillHeld)
! break;
}
}
! LWLockRelease(ProcArrayLock);
! } while (stillHeld);
}
! pfree(xids);
}
/*
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 312,318 **** InitProcess(void)
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
! MyProc->inCommit = false;
MyProc->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess())
--- 312,319 ----
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
! MyProc->holdCheckpoint = false;
! MyProc->chkptHoldCounter = 0;
MyProc->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess())
***************
*** 450,456 **** InitAuxiliaryProcess(void)
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
! MyProc->inCommit = false;
MyProc->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
--- 451,458 ----
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
! MyProc->holdCheckpoint = false;
! MyProc->chkptHoldCounter = 0;
MyProc->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
***************
*** 1784,1786 **** handle_standby_sig_alarm(SIGNAL_ARGS)
--- 1786,1829 ----
errno = save_errno;
}
+
+ /*
+ * Hold off checkpoints. A checkpoint that begins after this call won't
+ * finish until ResumeCheckpoint() is called. A checkpoint that is already
+ * in progress is not affected.
+ *
+ * You must call HoldCheckpoint() before starting an operation that consists
+ * of multiple WAL records, relying on the rmgr cleanup to finish an
+ * incomplete operation at end of recovery.
+ */
+ void
+ HoldCheckpoint(void)
+ {
+ volatile PGPROC *p = MyProc;
+ Assert(!p->holdCheckpoint);
+
+ /*
+ * Increment the counter, so that checkpoint can differentiate between
+ * an operation that was in progress when the checkpoint began, and a
+ * later one in the same transaction that began later.
+ *
+ * It's safe to change these for our own backend without holding the
+ * ProcArrayLock, since we're the only one modifying them. This makes
+ * checkpoint's determination of which xacts are holding off checkpoints
+ * a bit fuzzy, but it doesn't matter. See also comments in
+ * ProcArrayCheckpointBarrier().
+ */
+ p->chkptHoldCounter++;
+ p->holdCheckpoint = true;
+ }
+
+ /*
+ * Allow a possible concurrent checkpoint to finish.
+ */
+ void
+ ResumeCheckpoint(void)
+ {
+ volatile PGPROC *p = MyProc;
+ Assert(p->holdCheckpoint);
+ p->holdCheckpoint = false;
+ }
*** a/src/include/access/gin.h
--- b/src/include/access/gin.h
***************
*** 400,406 **** extern void gin_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gin_xlog_startup(void);
extern void gin_xlog_cleanup(void);
- extern bool gin_safe_restartpoint(void);
/* ginbtree.c */
--- 400,405 ----
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 252,258 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
- extern bool gist_safe_restartpoint(void);
extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
--- 252,257 ----
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 647,652 **** extern void btree_redo(XLogRecPtr lsn, XLogRecord *record);
extern void btree_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void btree_xlog_startup(void);
extern void btree_xlog_cleanup(void);
- extern bool btree_safe_restartpoint(void);
#endif /* NBTREE_H */
--- 647,651 ----
*** a/src/include/access/xlog_internal.h
--- b/src/include/access/xlog_internal.h
***************
*** 250,256 **** typedef struct RmgrData
void (*rm_desc) (StringInfo buf, uint8 xl_info, char *rec);
void (*rm_startup) (void);
void (*rm_cleanup) (void);
- bool (*rm_safe_restartpoint) (void);
} RmgrData;
extern const RmgrData RmgrTable[];
--- 250,255 ----
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 91,97 **** struct PGPROC
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */
! bool inCommit; /* true if within commit critical section */
uint8 vacuumFlags; /* vacuum-related flags, see above */
--- 91,99 ----
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */
! /* Fields for HoldCheckpoint()/ResumeCheckpoint() */
! int32 chkptHoldCounter;
! bool holdCheckpoint;
uint8 vacuumFlags; /* vacuum-related flags, see above */
***************
*** 204,207 **** extern bool enable_standby_sig_alarm(TimestampTz now,
--- 206,212 ----
extern bool disable_standby_sig_alarm(void);
extern void handle_standby_sig_alarm(SIGNAL_ARGS);
+ extern void HoldCheckpoint(void);
+ extern void ResumeCheckpoint(void);
+
#endif /* PROC_H */
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
***************
*** 48,55 **** extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
! extern int GetTransactionsInCommit(TransactionId **xids_p);
! extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
extern PGPROC *BackendPidGetProc(int pid);
extern int BackendXidGetPid(TransactionId xid);
--- 48,54 ----
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
! extern void ProcArrayCheckpointBarrier(void);
extern PGPROC *BackendPidGetProc(int pid);
extern int BackendXidGetPid(TransactionId xid);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers