On Mon, Jan 4, 2016 at 2:28 PM, Andres Freund <and...@anarazel.de> wrote:
> I think at the very least the cache should be protected by a separate > lock, and that lock should be acquired with TryLock. I.e. the cache is > updated opportunistically. I'd go for an lwlock in the first iteration. I tried to implement a simple patch which protect the cache. Of all the backend which compute the snapshot(when cache is invalid) only one of them will write to cache. This is done with one atomic compare and swap operation. After above fix memory corruption is not visible. But I see some more failures at higher client sessions(128 it is easily reproducible). Errors are DETAIL: Key (bid)=(24) already exists. STATEMENT: UPDATE pgbench_branches SET bbalance = bbalance + $1 WHERE bid = $2; client 17 aborted in state 11: ERROR: duplicate key value violates unique constraint "pgbench_branches_pkey" DETAIL: Key (bid)=(24) already exists. client 26 aborted in state 11: ERROR: duplicate key value violates unique constraint "pgbench_branches_pkey" DETAIL: Key (bid)=(87) already exists. ERROR: duplicate key value violates unique constraint "pgbench_branches_pkey" DETAIL: Key (bid)=(113) already exists. After some analysis I think In GetSnapshotData() while computing snapshot. /* * We don't include our own XIDs (if any) in the snapshot, but we * must include them in xmin. */ if (NormalTransactionIdPrecedes(xid, xmin)) xmin = xid; *********** if (pgxact == MyPgXact) ****************** continue; We do not add our own xid to xip array, I am wondering if other backend try to use the same snapshot will it be able to see changes made by me(current backend). I think since we compute a snapshot which will be used by other backends we need to add our xid to xip array to tell transaction is open. -- Thanks and Regards Mithun C Y EnterpriseDB: http://www.enterprisedb.com
*** a/src/backend/commands/cluster.c --- b/src/backend/commands/cluster.c *************** *** 1559,1564 **** finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, --- 1559,1565 ---- elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap); relform = (Form_pg_class) GETSTRUCT(reltup); + Assert(TransactionIdIsNormal(frozenXid)); relform->relfrozenxid = frozenXid; relform->relminmxid = cutoffMulti; *** a/src/backend/storage/ipc/procarray.c --- b/src/backend/storage/ipc/procarray.c *************** *** 410,415 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) --- 410,417 ---- if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE)) { ProcArrayEndTransactionInternal(proc, pgxact, latestXid); + pg_atomic_write_u32(&ProcGlobal->snapshot_cached, 0); + ProcGlobal->cached_snapshot_valid = false; LWLockRelease(ProcArrayLock); } else *************** *** 557,562 **** ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid) --- 559,568 ---- /* Move to next proc in list. */ nextidx = pg_atomic_read_u32(&proc->nextClearXidElem); } + + pg_atomic_write_u32(&ProcGlobal->snapshot_cached, 0); + + ProcGlobal->cached_snapshot_valid = false; /* We're done with the lock now. */ LWLockRelease(ProcArrayLock); *************** *** 1543,1548 **** GetSnapshotData(Snapshot snapshot) --- 1549,1556 ---- errmsg("out of memory"))); } + snapshot->takenDuringRecovery = RecoveryInProgress(); + /* * It is sufficient to get shared lock on ProcArrayLock, even if we are * going to set MyPgXact->xmin. *************** *** 1557,1568 **** GetSnapshotData(Snapshot snapshot) /* initialize xmin calculation with xmax */ globalxmin = xmin = xmax; ! snapshot->takenDuringRecovery = RecoveryInProgress(); ! if (!snapshot->takenDuringRecovery) { int *pgprocnos = arrayP->pgprocnos; int numProcs; /* * Spin over procArray checking xid, xmin, and subxids. The goal is --- 1565,1599 ---- /* initialize xmin calculation with xmax */ globalxmin = xmin = xmax; ! if (!snapshot->takenDuringRecovery && ProcGlobal->cached_snapshot_valid) ! { ! Snapshot csnap = &ProcGlobal->cached_snapshot; ! TransactionId *saved_xip; ! TransactionId *saved_subxip; ! saved_xip = snapshot->xip; ! saved_subxip = snapshot->subxip; ! ! memcpy(snapshot, csnap, sizeof(SnapshotData)); ! ! snapshot->xip = saved_xip; ! snapshot->subxip = saved_subxip; ! ! memcpy(snapshot->xip, csnap->xip, ! sizeof(TransactionId) * csnap->xcnt); ! memcpy(snapshot->subxip, csnap->subxip, ! sizeof(TransactionId) * csnap->subxcnt); ! globalxmin = ProcGlobal->cached_snapshot_globalxmin; ! xmin = csnap->xmin; ! ! Assert(TransactionIdIsValid(globalxmin)); ! Assert(TransactionIdIsValid(xmin)); ! } ! else if (!snapshot->takenDuringRecovery) { int *pgprocnos = arrayP->pgprocnos; int numProcs; + const uint32 snapshot_cached= 0; /* * Spin over procArray checking xid, xmin, and subxids. The goal is *************** *** 1577,1591 **** GetSnapshotData(Snapshot snapshot) TransactionId xid; /* ! * Backend is doing logical decoding which manages xmin ! * separately, check below. */ ! if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING) ! continue; ! ! /* Ignore procs running LAZY VACUUM */ ! if (pgxact->vacuumFlags & PROC_IN_VACUUM) ! continue; /* Update globalxmin to be the smallest valid xmin */ xid = pgxact->xmin; /* fetch just once */ --- 1608,1619 ---- TransactionId xid; /* ! * Ignore procs running LAZY VACUUM (which don't need to retain ! * rows) and backends doing logical decoding (which manages xmin ! * separately, check below). */ ! if (pgxact->vacuumFlags & (PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM)) ! continue; /* Update globalxmin to be the smallest valid xmin */ xid = pgxact->xmin; /* fetch just once */ *************** *** 1653,1658 **** GetSnapshotData(Snapshot snapshot) --- 1681,1715 ---- } } } + + /* + * Let only one of the caller cache the computed snapshot, and others + * can continue as before. + */ + if (pg_atomic_compare_exchange_u32(&ProcGlobal->snapshot_cached, + &snapshot_cached, 1)) + { + Snapshot csnap = &ProcGlobal->cached_snapshot; + TransactionId *saved_xip; + TransactionId *saved_subxip; + + ProcGlobal->cached_snapshot_globalxmin = globalxmin; + + saved_xip = csnap->xip; + saved_subxip = csnap->subxip; + memcpy(csnap, snapshot, sizeof(SnapshotData)); + csnap->xip = saved_xip; + csnap->subxip = saved_subxip; + + /* not yet stored. Yuck */ + csnap->xmin = xmin; + + memcpy(csnap->xip, snapshot->xip, + sizeof(TransactionId) * csnap->xcnt); + memcpy(csnap->subxip, snapshot->subxip, + sizeof(TransactionId) * csnap->subxcnt); + ProcGlobal->cached_snapshot_valid = true; + } } else { *** a/src/backend/storage/lmgr/proc.c --- b/src/backend/storage/lmgr/proc.c *************** *** 51,57 **** #include "storage/spin.h" #include "utils/timeout.h" #include "utils/timestamp.h" ! /* GUC variables */ int DeadlockTimeout = 1000; --- 51,57 ---- #include "storage/spin.h" #include "utils/timeout.h" #include "utils/timestamp.h" ! #include "port/atomics.h" /* GUC variables */ int DeadlockTimeout = 1000; *************** *** 114,119 **** ProcGlobalShmemSize(void) --- 114,126 ---- size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGXACT))); size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGXACT))); + /* for the cached snapshot */ + #define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) + size = add_size(size, mul_size(sizeof(TransactionId), PROCARRAY_MAXPROCS)); + #define TOTAL_MAX_CACHED_SUBXIDS \ + ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) + size = add_size(size, mul_size(sizeof(TransactionId), TOTAL_MAX_CACHED_SUBXIDS)); + return size; } *************** *** 275,280 **** InitProcGlobal(void) --- 282,294 ---- /* Create ProcStructLock spinlock, too */ ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t)); SpinLockInit(ProcStructLock); + + /* cached snapshot */ + ProcGlobal->cached_snapshot_valid = false; + pg_atomic_write_u32(&ProcGlobal->snapshot_cached, 0); + ProcGlobal->cached_snapshot.xip = ShmemAlloc(PROCARRAY_MAXPROCS * sizeof(TransactionId)); + ProcGlobal->cached_snapshot.subxip = ShmemAlloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId)); + ProcGlobal->cached_snapshot_globalxmin = InvalidTransactionId; } /* *** a/src/include/storage/proc.h --- b/src/include/storage/proc.h *************** *** 16,21 **** --- 16,22 ---- #include "access/xlogdefs.h" #include "lib/ilist.h" + #include "utils/snapshot.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" *************** *** 220,225 **** typedef struct PROC_HDR --- 221,232 ---- int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + + /* Cached snapshot */ + volatile bool cached_snapshot_valid; + pg_atomic_uint32 snapshot_cached; + SnapshotData cached_snapshot; + TransactionId cached_snapshot_globalxmin; } PROC_HDR; extern PROC_HDR *ProcGlobal;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers