On Wed, Sep 04, 2024 at 09:00:32PM +0530, Nitin Motiani wrote:
> How about this alternative then? The tuple length check
> and the elog(ERROR) gets its own function. Something like
> heap_inplace_update_validate or
> heap_inplace_update_validate_tuple_length. So in that case, it would
> look like this :
> 
>   genam.c:systable_inplace_update_finish
>     heapam.c:heap_inplace_update_validate/heap_inplace_update_precheck
>     PreInplace_Inval
>     START_CRIT_SECTION
>     heapam.c:heap_inplace_update
>     BUFFER_LOCK_UNLOCK
>     AtInplace_Inval
>     END_CRIT_SECTION
>     UnlockTuple
>     AcceptInvalidationMessages
> 
> This is starting to get complicated though so I don't have any issues
> with just renaming the heap_inplace_update to
> heap_inplace_update_and_unlock.

Complexity aside, I don't see the _precheck design qualifying as a modularity
improvement.

>  Assert(rel->ri_needsLockTagTuple ==  
> IsInplaceUpdateRelation(rel->relationDesc)
> 
> This can safeguard against users of ResultRelInfo missing this field.

v10 does the rename and adds that assertion.  This question remains open:

On Thu, Aug 22, 2024 at 12:32:00AM -0700, Noah Misch wrote:
> On Tue, Aug 20, 2024 at 11:59:45AM +0300, Heikki Linnakangas wrote:
> > How many of those for RELKIND_INDEX vs tables? I'm thinking if we should
> > always require a tuple lock on indexes, if that would make a difference.
> 
> Three sites.  See attached inplace125 patch.  Is it a net improvement?  If so,
> I'll squash it into inplace120.

If nobody has an opinion, I'll discard inplace125.  I feel it's not a net
improvement, but either way is fine with me.
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Warn if LOCKTAG_TUPLE is held at commit, under debug_assertions.
    
    The current use always releases this locktag.  A planned use will
    continue that intent.  It will involve more areas of code, making unlock
    omissions easier.  Warn under debug_assertions, like we do for various
    resource leaks.  Back-patch to v12 (all supported versions), the plan
    for the commit of the new use.
    
    Reviewed by Heikki Linnakangas.
    
    Discussion: https://postgr.es/m/20240512232923.aa.nmi...@google.com

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 83b99a9..51d52c4 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -2255,6 +2255,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                                locallock->numLockOwners = 0;
                }
 
+#ifdef USE_ASSERT_CHECKING
+
+               /*
+                * Tuple locks are currently held only for short durations 
within a
+                * transaction. Check that we didn't forget to release one.
+                */
+               if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_TUPLE && !allLocks)
+                       elog(WARNING, "tuple lock held at commit");
+#endif
+
                /*
                 * If the lock or proclock pointers are NULL, this lock was 
taken via
                 * the relation fast-path (and is not known to have been 
transferred).
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Fix data loss at inplace update after heap_update().
    
    As previously-added tests demonstrated, heap_inplace_update() could
    instead update an unrelated tuple of the same catalog.  It could lose
    the update.  Losing relhasindex=t was a source of index corruption.
    Inplace-updating commands like VACUUM will now wait for heap_update()
    commands like GRANT TABLE and GRANT DATABASE.  That isn't ideal, but a
    long-running GRANT already hurts VACUUM progress more just by keeping an
    XID running.  The VACUUM will behave like a DELETE or UPDATE waiting for
    the uncommitted change.
    
    For implementation details, start at the systable_inplace_update_begin()
    header comment and README.tuplock.  Back-patch to v12 (all supported
    versions).  In back branches, retain a deprecated heap_inplace_update(),
    for extensions.
    
    Reviewed by Heikki Linnakangas, Nitin Motiani and Alexander Lakhin.
    
    Discussion: 
https://postgr.es/m/CAMp+ueZQz3yDk7qg42hk6-9gxniYbp-=bg2mgqecerqr5gg...@mail.gmail.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index 6441e8b..ddb2def 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -153,3 +153,14 @@ The following infomask bits are applicable:
 
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
+
+Reading inplace-updated columns
+-------------------------------
+
+Inplace updates create an exception to the rule that tuple data won't change
+under a reader holding a pin.  A reader of a heap_fetch() result tuple may
+witness a torn read.  Current inplace-updated fields are aligned and are no
+wider than four bytes, and current readers don't need consistency across
+fields.  Hence, they get by with just fetching each field once.  XXX such a
+caller may also read a value that has not reached WAL; see
+systable_inplace_update_finish().
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b2014..0b7dc0a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -63,7 +63,6 @@
 #include "storage/procarray.h"
 #include "storage/standby.h"
 #include "utils/datum.h"
-#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
@@ -6041,61 +6040,167 @@ heap_abort_speculative(Relation relation, ItemPointer 
tid)
 }
 
 /*
- * heap_inplace_update - update a tuple "in place" (ie, overwrite it)
+ * heap_inplace_lock - protect inplace update from concurrent heap_update()
  *
- * Overwriting violates both MVCC and transactional safety, so the uses
- * of this function in Postgres are extremely limited.  Nonetheless we
- * find some places to use it.
+ * Evaluate whether the tuple's state is compatible with a no-key update.
+ * Current transaction rowmarks are fine, as is KEY SHARE from any
+ * transaction.  If compatible, return true with the buffer exclusive-locked,
+ * and the caller must release that by calling
+ * heap_inplace_update_and_unlock(), calling heap_inplace_unlock(), or raising
+ * an error.  Otherwise, return false after blocking transactions, if any,
+ * have ended.
  *
- * The tuple cannot change size, and therefore it's reasonable to assume
- * that its null bitmap (if any) doesn't change either.  So we just
- * overwrite the data portion of the tuple without touching the null
- * bitmap or any of the header fields.
+ * Since this is intended for system catalogs and SERIALIZABLE doesn't cover
+ * DDL, this doesn't guarantee any particular predicate locking.
  *
- * tuple is an in-memory tuple structure containing the data to be written
- * over the target tuple.  Also, tuple->t_self identifies the target tuple.
+ * One could modify this to return true for tuples with delete in progress,
+ * All inplace updaters take a lock that conflicts with DROP.  If explicit
+ * "DELETE FROM pg_class" is in progress, we'll wait for it like we would an
+ * update.
  *
- * Note that the tuple updated here had better not come directly from the
- * syscache if the relation has a toast relation as this tuple could
- * include toast values that have been expanded, causing a failure here.
+ * Readers of inplace-updated fields expect changes to those fields are
+ * durable.  For example, vac_truncate_clog() reads datfrozenxid from
+ * pg_database tuples via catalog snapshots.  A future snapshot must not
+ * return a lower datfrozenxid for the same database OID (lower in the
+ * FullTransactionIdPrecedes() sense).  We achieve that since no update of a
+ * tuple can start while we hold a lock on its buffer.  In cases like
+ * BEGIN;GRANT;CREATE INDEX;COMMIT we're inplace-updating a tuple visible only
+ * to this transaction.  ROLLBACK then is one case where it's okay to lose
+ * inplace updates.  (Restoring relhasindex=false on ROLLBACK is fine, since
+ * any concurrent CREATE INDEX would have blocked, then inplace-updated the
+ * committed tuple.)
+ *
+ * In principle, we could avoid waiting by overwriting every tuple in the
+ * updated tuple chain.  Reader expectations permit updating a tuple only if
+ * it's aborted, is the tail of the chain, or we already updated the tuple
+ * referenced in its t_ctid.  Hence, we would need to overwrite the tuples in
+ * order from tail to head.  That would imply either (a) mutating all tuples
+ * in one critical section or (b) accepting a chance of partial completion.
+ * Partial completion of a relfrozenxid update would have the weird
+ * consequence that the table's next VACUUM could see the table's relfrozenxid
+ * move forward between vacuum_get_cutoffs() and finishing.
+ */
+bool
+heap_inplace_lock(Relation relation,
+                                 HeapTuple oldtup_ptr, Buffer buffer)
+{
+       HeapTupleData oldtup = *oldtup_ptr; /* minimize diff vs. heap_update() 
*/
+       TM_Result       result;
+       bool            ret;
+
+       Assert(BufferIsValid(buffer));
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*----------
+        * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
+        *
+        * - wait unconditionally
+        * - no tuple locks
+        * - don't recheck header after wait: simpler to defer to next iteration
+        * - don't try to continue even if the updater aborts: likewise
+        * - no crosscheck
+        */
+       result = HeapTupleSatisfiesUpdate(&oldtup, GetCurrentCommandId(false),
+                                                                         
buffer);
+
+       if (result == TM_Invisible)
+       {
+               /* no known way this can happen */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg_internal("attempted to overwrite 
invisible tuple")));
+       }
+       else if (result == TM_SelfModified)
+       {
+               /*
+                * CREATE INDEX might reach this if an expression is silly 
enough to
+                * call e.g. SELECT ... FROM pg_class FOR SHARE.  C code of 
other SQL
+                * statements might get here after a heap_update() of the same 
row, in
+                * the absence of an intervening CommandCounterIncrement().
+                */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("tuple to be updated was already 
modified by an operation triggered by the current command")));
+       }
+       else if (result == TM_BeingModified)
+       {
+               TransactionId xwait;
+               uint16          infomask;
+
+               xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+               infomask = oldtup.t_data->t_infomask;
+
+               if (infomask & HEAP_XMAX_IS_MULTI)
+               {
+                       LockTupleMode lockmode = LockTupleNoKeyExclusive;
+                       MultiXactStatus mxact_status = 
MultiXactStatusNoKeyUpdate;
+                       int                     remain;
+                       bool            current_is_member;
+
+                       if (DoesMultiXactIdConflict((MultiXactId) xwait, 
infomask,
+                                                                               
lockmode, &current_is_member))
+                       {
+                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                               ret = false;
+                               MultiXactIdWait((MultiXactId) xwait, 
mxact_status, infomask,
+                                                               relation, 
&oldtup.t_self, XLTW_Update,
+                                                               &remain);
+                       }
+                       else
+                               ret = true;
+               }
+               else if (TransactionIdIsCurrentTransactionId(xwait))
+                       ret = true;
+               else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                       ret = true;
+               else
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ret = false;
+                       XactLockTableWait(xwait, relation, &oldtup.t_self,
+                                                         XLTW_Update);
+               }
+       }
+       else
+       {
+               ret = (result == TM_Ok);
+               if (!ret)
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               }
+       }
+
+       /*
+        * GetCatalogSnapshot() relies on invalidation messages to know when to
+        * take a new snapshot.  COMMIT of xwait is responsible for sending the
+        * invalidation.  We're not acquiring heavyweight locks sufficient to
+        * block if not yet sent, so we must take a new snapshot to ensure a 
later
+        * attempt has a fair chance.  While we don't need this if xwait 
aborted,
+        * don't bother optimizing that.
+        */
+       if (!ret)
+               InvalidateCatalogSnapshot();
+       return ret;
+}
+
+/*
+ * heap_inplace_update_and_unlock - core of systable_inplace_update_finish
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
  */
 void
-heap_inplace_update(Relation relation, HeapTuple tuple)
+heap_inplace_update_and_unlock(Relation relation,
+                                                          HeapTuple oldtup, 
HeapTuple tuple,
+                                                          Buffer buffer)
 {
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
+       HeapTupleHeader htup = oldtup->t_data;
        uint32          oldlen;
        uint32          newlen;
 
-       /*
-        * For now, we don't allow parallel updates.  Unlike a regular update,
-        * this should never create a combo CID, so it might be possible to 
relax
-        * this restriction, but not without more thought and testing.  It's not
-        * clear that it would be useful, anyway.
-        */
-       if (IsInParallelMode())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-                                errmsg("cannot update tuples during a parallel 
operation")));
-
-       INJECTION_POINT("inplace-before-pin");
-       buffer = ReadBuffer(relation, 
ItemPointerGetBlockNumber(&(tuple->t_self)));
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-       page = (Page) BufferGetPage(buffer);
-
-       offnum = ItemPointerGetOffsetNumber(&(tuple->t_self));
-       if (PageGetMaxOffsetNumber(page) >= offnum)
-               lp = PageGetItemId(page, offnum);
-
-       if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-               elog(ERROR, "invalid lp");
-
-       htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-       oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+       Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
+       oldlen = oldtup->t_len - htup->t_hoff;
        newlen = tuple->t_len - tuple->t_data->t_hoff;
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
@@ -6107,6 +6212,19 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                   (char *) tuple->t_data + tuple->t_data->t_hoff,
                   newlen);
 
+       /*----------
+        * XXX A crash here can allow datfrozenxid() to get ahead of 
relfrozenxid:
+        *
+        * ["D" is a VACUUM (ONLY_DATABASE_STATS)]
+        * ["R" is a VACUUM tbl]
+        * D: vac_update_datfrozenid() -> systable_beginscan(pg_class)
+        * D: systable_getnext() returns pg_class tuple of tbl
+        * R: memcpy() into pg_class tuple of tbl
+        * D: raise pg_database.datfrozenxid, XLogInsert(), finish
+        * [crash]
+        * [recovery restores datfrozenxid w/o relfrozenxid]
+        */
+
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
@@ -6127,23 +6245,35 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
 
-               PageSetLSN(page, recptr);
+               PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
        END_CRIT_SECTION();
 
-       UnlockReleaseBuffer(buffer);
+       heap_inplace_unlock(relation, oldtup, buffer);
 
        /*
         * Send out shared cache inval if necessary.  Note that because we only
         * pass the new version of the tuple, this mustn't be used for any
         * operations that could change catcache lookup keys.  But we aren't
         * bothering with index updates either, so that's true a fortiori.
+        *
+        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
         */
        if (!IsBootstrapProcessingMode())
                CacheInvalidateHeapTuple(relation, tuple, NULL);
 }
 
+/*
+ * heap_inplace_unlock - reverse of heap_inplace_lock
+ */
+void
+heap_inplace_unlock(Relation relation,
+                                       HeapTuple oldtup, Buffer buffer)
+{
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+}
+
 #define                FRM_NOOP                                0x0001
 #define                FRM_INVALIDATE_XMAX             0x0002
 #define                FRM_RETURN_IS_XID               0x0004
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 43c95d6..964a9a2 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -20,6 +20,7 @@
 #include "postgres.h"
 
 #include "access/genam.h"
+#include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
+#include "utils/injection_point.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
@@ -747,3 +749,140 @@ systable_endscan_ordered(SysScanDesc sysscan)
                UnregisterSnapshot(sysscan->snapshot);
        pfree(sysscan);
 }
+
+/*
+ * systable_inplace_update_begin --- update a row "in place" (overwrite it)
+ *
+ * Overwriting violates both MVCC and transactional safety, so the uses of
+ * this function in Postgres are extremely limited.  Nonetheless we find some
+ * places to use it.  Standard flow:
+ *
+ * ... [any slow preparation not requiring oldtup] ...
+ * systable_inplace_update_begin([...], &tup, &inplace_state);
+ * if (!HeapTupleIsValid(tup))
+ *     elog(ERROR, [...]);
+ * ... [buffer is exclusive-locked; mutate "tup"] ...
+ * if (dirty)
+ *     systable_inplace_update_finish(inplace_state, tup);
+ * else
+ *     systable_inplace_update_cancel(inplace_state);
+ *
+ * The first several params duplicate the systable_beginscan() param list.
+ * "oldtupcopy" is an output parameter, assigned NULL if the key ceases to
+ * find a live tuple.  (In PROC_IN_VACUUM, that is a low-probability transient
+ * condition.)  If "oldtupcopy" gets non-NULL, you must pass output parameter
+ * "state" to systable_inplace_update_finish() or
+ * systable_inplace_update_cancel().
+ */
+void
+systable_inplace_update_begin(Relation relation,
+                                                         Oid indexId,
+                                                         bool indexOK,
+                                                         Snapshot snapshot,
+                                                         int nkeys, const 
ScanKeyData *key,
+                                                         HeapTuple *oldtupcopy,
+                                                         void **state)
+{
+       ScanKey         mutable_key = palloc(sizeof(ScanKeyData) * nkeys);
+       int                     retries = 0;
+       SysScanDesc scan;
+       HeapTuple       oldtup;
+
+       /*
+        * For now, we don't allow parallel updates.  Unlike a regular update,
+        * this should never create a combo CID, so it might be possible to 
relax
+        * this restriction, but not without more thought and testing.  It's not
+        * clear that it would be useful, anyway.
+        */
+       if (IsInParallelMode())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+                                errmsg("cannot update tuples during a parallel 
operation")));
+
+       /*
+        * Accept a snapshot argument, for symmetry, but this function advances
+        * its snapshot as needed to reach the tail of the updated tuple chain.
+        */
+       Assert(snapshot == NULL);
+
+       Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
+
+       /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       for (;;)
+       {
+               TupleTableSlot *slot;
+               BufferHeapTupleTableSlot *bslot;
+
+               CHECK_FOR_INTERRUPTS();
+
+               /*
+                * Processes issuing heap_update (e.g. GRANT) at maximum speed 
could
+                * drive us to this error.  A hostile table owner has stronger 
ways to
+                * damage their own table, so that's minor.
+                */
+               if (retries++ > 10000)
+                       elog(ERROR, "giving up after too many tries to 
overwrite row");
+
+               memcpy(mutable_key, key, sizeof(ScanKeyData) * nkeys);
+               INJECTION_POINT("inplace-before-pin");
+               scan = systable_beginscan(relation, indexId, indexOK, snapshot,
+                                                                 nkeys, 
mutable_key);
+               oldtup = systable_getnext(scan);
+               if (!HeapTupleIsValid(oldtup))
+               {
+                       systable_endscan(scan);
+                       *oldtupcopy = NULL;
+                       return;
+               }
+
+               slot = scan->slot;
+               Assert(TTS_IS_BUFFERTUPLE(slot));
+               bslot = (BufferHeapTupleTableSlot *) slot;
+               if (heap_inplace_lock(scan->heap_rel,
+                                                         bslot->base.tuple, 
bslot->buffer))
+                       break;
+               systable_endscan(scan);
+       };
+
+       *oldtupcopy = heap_copytuple(oldtup);
+       *state = scan;
+}
+
+/*
+ * systable_inplace_update_finish --- second phase of inplace update
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
+ */
+void
+systable_inplace_update_finish(void *state, HeapTuple tuple)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       Relation        relation = scan->heap_rel;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+
+       heap_inplace_update_and_unlock(relation, oldtup, tuple, buffer);
+       systable_endscan(scan);
+}
+
+/*
+ * systable_inplace_update_cancel --- abandon inplace update
+ *
+ * This is an alternative to making a no-op update.
+ */
+void
+systable_inplace_update_cancel(void *state)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       Relation        relation = scan->heap_rel;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+
+       heap_inplace_unlock(relation, oldtup, buffer);
+       systable_endscan(scan);
+}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 3375905..e4608b9 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2785,7 +2785,9 @@ index_update_stats(Relation rel,
 {
        Oid                     relid = RelationGetRelid(rel);
        Relation        pg_class;
+       ScanKeyData key[1];
        HeapTuple       tuple;
+       void       *state;
        Form_pg_class rd_rel;
        bool            dirty;
 
@@ -2819,33 +2821,12 @@ index_update_stats(Relation rel,
 
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       /*
-        * Make a copy of the tuple to update.  Normally we use the syscache, 
but
-        * we can't rely on that during bootstrap or while reindexing pg_class
-        * itself.
-        */
-       if (IsBootstrapProcessingMode() ||
-               ReindexIsProcessingHeap(RelationRelationId))
-       {
-               /* don't assume syscache will work */
-               TableScanDesc pg_class_scan;
-               ScanKeyData key[1];
-
-               ScanKeyInit(&key[0],
-                                       Anum_pg_class_oid,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(relid));
-
-               pg_class_scan = table_beginscan_catalog(pg_class, 1, key);
-               tuple = heap_getnext(pg_class_scan, ForwardScanDirection);
-               tuple = heap_copytuple(tuple);
-               table_endscan(pg_class_scan);
-       }
-       else
-       {
-               /* normal case, use syscache */
-               tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
-       }
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       systable_inplace_update_begin(pg_class, ClassOidIndexId, true, NULL,
+                                                                 1, key, 
&tuple, &state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u", relid);
@@ -2908,11 +2889,12 @@ index_update_stats(Relation rel,
         */
        if (dirty)
        {
-               heap_inplace_update(pg_class, tuple);
+               systable_inplace_update_finish(state, tuple);
                /* the above sends a cache inval message */
        }
        else
        {
+               systable_inplace_update_cancel(state);
                /* no need to change tuple, but force relcache inval anyway */
                CacheInvalidateRelcacheByTuple(tuple);
        }
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 738bc46..ad3082c 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -29,6 +29,7 @@
 #include "catalog/toasting.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -333,21 +334,36 @@ create_toast_table(Relation rel, Oid toastOid, Oid 
toastIndexOid,
         */
        class_rel = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
-       if (!HeapTupleIsValid(reltup))
-               elog(ERROR, "cache lookup failed for relation %u", relOid);
-
-       ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
-
        if (!IsBootstrapProcessingMode())
        {
                /* normal case, use a transactional update */
+               reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
                CatalogTupleUpdate(class_rel, &reltup->t_self, reltup);
        }
        else
        {
                /* While bootstrapping, we cannot UPDATE, so overwrite in-place 
*/
-               heap_inplace_update(class_rel, reltup);
+
+               ScanKeyData key[1];
+               void       *state;
+
+               ScanKeyInit(&key[0],
+                                       Anum_pg_class_oid,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(relOid));
+               systable_inplace_update_begin(class_rel, ClassOidIndexId, true,
+                                                                         NULL, 
1, key, &reltup, &state);
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
+               systable_inplace_update_finish(state, reltup);
        }
 
        heap_freetuple(reltup);
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index 8be435a..40bfd09 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1651,7 +1651,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        Relation        pgdbrel;
        HeapTuple       tup;
        ScanKeyData scankey;
-       SysScanDesc scan;
+       void       *inplace_state;
        Form_pg_database datform;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1790,24 +1790,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        pgstat_drop_database(db_id);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
-        */
-       ScanKeyInit(&scankey,
-                               Anum_pg_database_datname,
-                               BTEqualStrategyNumber, F_NAMEEQ,
-                               CStringGetDatum(dbname));
-
-       scan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
-                                                         NULL, 1, &scankey);
-
-       tup = systable_getnext(scan);
-       if (!HeapTupleIsValid(tup))
-               elog(ERROR, "cache lookup failed for database %u", db_id);
-       datform = (Form_pg_database) GETSTRUCT(tup);
-
-       /*
         * Except for the deletion of the catalog row, subsequent actions are 
not
         * transactional (consider DropDatabaseBuffers() discarding modified
         * buffers). But we might crash or get interrupted below. To prevent
@@ -1818,8 +1800,17 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * modification is durable before performing irreversible filesystem
         * operations.
         */
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(dbname));
+       systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
+                                                                 NULL, 1, 
&scankey, &tup, &inplace_state);
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
+       datform = (Form_pg_database) GETSTRUCT(tup);
        datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
-       heap_inplace_update(pgdbrel, tup);
+       systable_inplace_update_finish(inplace_state, tup);
        XLogFlush(XactLastRecEnd);
 
        /*
@@ -1827,8 +1818,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * the row will be gone, but if we fail, dropdb() can be invoked again.
         */
        CatalogTupleDelete(pgdbrel, &tup->t_self);
-
-       systable_endscan(scan);
+       heap_freetuple(tup);
 
        /*
         * Drop db-specific replication slots.
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 7a5ed6b..55baf10 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -946,25 +946,18 @@ EventTriggerOnLogin(void)
                {
                        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
                        HeapTuple       tuple;
+                       void       *state;
                        Form_pg_database db;
                        ScanKeyData key[1];
-                       SysScanDesc scan;
 
-                       /*
-                        * Get the pg_database tuple to scribble on.  Note that 
this does
-                        * not directly rely on the syscache to avoid issues 
with
-                        * flattened toast values for the in-place update.
-                        */
+                       /* Fetch a copy of the tuple to scribble on */
                        ScanKeyInit(&key[0],
                                                Anum_pg_database_oid,
                                                BTEqualStrategyNumber, F_OIDEQ,
                                                ObjectIdGetDatum(MyDatabaseId));
 
-                       scan = systable_beginscan(pg_db, DatabaseOidIndexId, 
true,
-                                                                         NULL, 
1, key);
-                       tuple = systable_getnext(scan);
-                       tuple = heap_copytuple(tuple);
-                       systable_endscan(scan);
+                       systable_inplace_update_begin(pg_db, 
DatabaseOidIndexId, true,
+                                                                               
  NULL, 1, key, &tuple, &state);
 
                        if (!HeapTupleIsValid(tuple))
                                elog(ERROR, "could not find tuple for database 
%u", MyDatabaseId);
@@ -980,13 +973,15 @@ EventTriggerOnLogin(void)
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
                                 *
-                                * It's known that changes made by 
heap_inplace_update() may
-                                * be lost due to concurrent normal updates.  
However, we are
-                                * OK with that.  The subsequent connections 
will still have a
-                                * chance to set "dathasloginevt" to false.
+                                * Changes made by inplace update may be lost 
due to
+                                * concurrent normal updates; see 
inplace-inval.spec. However,
+                                * we are OK with that.  The subsequent 
connections will still
+                                * have a chance to set "dathasloginevt" to 
false.
                                 */
-                               heap_inplace_update(pg_db, tuple);
+                               systable_inplace_update_finish(state, tuple);
                        }
+                       else
+                               systable_inplace_update_cancel(state);
                        table_close(pg_db, RowExclusiveLock);
                        heap_freetuple(tuple);
                }
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 7d8e9d2..9304b8c 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1402,7 +1402,9 @@ vac_update_relstats(Relation relation,
 {
        Oid                     relid = RelationGetRelid(relation);
        Relation        rd;
+       ScanKeyData key[1];
        HeapTuple       ctup;
+       void       *inplace_state;
        Form_pg_class pgcform;
        bool            dirty,
                                futurexid,
@@ -1413,7 +1415,12 @@ vac_update_relstats(Relation relation,
        rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
-       ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       systable_inplace_update_begin(rd, ClassOidIndexId, true,
+                                                                 NULL, 1, key, 
&ctup, &inplace_state);
        if (!HeapTupleIsValid(ctup))
                elog(ERROR, "pg_class entry for relid %u vanished during 
vacuuming",
                         relid);
@@ -1521,7 +1528,9 @@ vac_update_relstats(Relation relation,
 
        /* If anything changed, write out the tuple. */
        if (dirty)
-               heap_inplace_update(rd, ctup);
+               systable_inplace_update_finish(inplace_state, ctup);
+       else
+               systable_inplace_update_cancel(inplace_state);
 
        table_close(rd, RowExclusiveLock);
 
@@ -1573,6 +1582,7 @@ vac_update_datfrozenxid(void)
        bool            bogus = false;
        bool            dirty = false;
        ScanKeyData key[1];
+       void       *inplace_state;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1696,20 +1706,18 @@ vac_update_datfrozenxid(void)
        relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
+        * Fetch a copy of the tuple to scribble on.  We could check the 
syscache
+        * tuple first.  If that concluded !dirty, we'd avoid waiting on
+        * concurrent heap_update() and would avoid exclusive-locking the 
buffer.
+        * For now, don't optimize that.
         */
        ScanKeyInit(&key[0],
                                Anum_pg_database_oid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(MyDatabaseId));
 
-       scan = systable_beginscan(relation, DatabaseOidIndexId, true,
-                                                         NULL, 1, key);
-       tuple = systable_getnext(scan);
-       tuple = heap_copytuple(tuple);
-       systable_endscan(scan);
+       systable_inplace_update_begin(relation, DatabaseOidIndexId, true,
+                                                                 NULL, 1, key, 
&tuple, &inplace_state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for database %u", 
MyDatabaseId);
@@ -1743,7 +1751,9 @@ vac_update_datfrozenxid(void)
                newMinMulti = dbform->datminmxid;
 
        if (dirty)
-               heap_inplace_update(relation, tuple);
+               systable_inplace_update_finish(inplace_state, tuple);
+       else
+               systable_inplace_update_cancel(inplace_state);
 
        heap_freetuple(tuple);
        table_close(relation, RowExclusiveLock);
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index fdcfbe8..c25f5d1 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -233,5 +233,14 @@ extern SysScanDesc systable_beginscan_ordered(Relation 
heapRelation,
 extern HeapTuple systable_getnext_ordered(SysScanDesc sysscan,
                                                                                
  ScanDirection direction);
 extern void systable_endscan_ordered(SysScanDesc sysscan);
+extern void systable_inplace_update_begin(Relation relation,
+                                                                               
  Oid indexId,
+                                                                               
  bool indexOK,
+                                                                               
  Snapshot snapshot,
+                                                                               
  int nkeys, const ScanKeyData *key,
+                                                                               
  HeapTuple *oldtupcopy,
+                                                                               
  void **state);
+extern void systable_inplace_update_finish(void *state, HeapTuple tuple);
+extern void systable_inplace_update_cancel(void *state);
 
 #endif                                                 /* GENAM_H */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec8..0970941 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -336,7 +336,13 @@ extern TM_Result heap_lock_tuple(Relation relation, 
HeapTuple tuple,
                                                                 bool 
follow_updates,
                                                                 Buffer 
*buffer, struct TM_FailureData *tmfd);
 
-extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern bool heap_inplace_lock(Relation relation,
+                                                         HeapTuple oldtup_ptr, 
Buffer buffer);
+extern void heap_inplace_update_and_unlock(Relation relation,
+                                                                               
   HeapTuple oldtup, HeapTuple tuple,
+                                                                               
   Buffer buffer);
+extern void heap_inplace_unlock(Relation relation,
+                                                               HeapTuple 
oldtup, Buffer buffer);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
                                                                          const 
struct VacuumCutoffs *cutoffs,
                                                                          
HeapPageFreeze *pagefrz,
diff --git a/src/test/isolation/expected/intra-grant-inplace-db.out 
b/src/test/isolation/expected/intra-grant-inplace-db.out
index 432ece5..a91402c 100644
--- a/src/test/isolation/expected/intra-grant-inplace-db.out
+++ b/src/test/isolation/expected/intra-grant-inplace-db.out
@@ -9,20 +9,20 @@ step b1: BEGIN;
 step grant1: 
        GRANT TEMP ON DATABASE isolation_regression TO regress_temp_grantee;
 
-step vac2: VACUUM (FREEZE);
+step vac2: VACUUM (FREEZE); <waiting ...>
 step snap3: 
        INSERT INTO frozen_witness
        SELECT datfrozenxid FROM pg_database WHERE datname = current_catalog;
 
 step c1: COMMIT;
+step vac2: <... completed>
 step cmp3: 
        SELECT 'datfrozenxid retreated'
        FROM pg_database
        WHERE datname = current_catalog
                AND age(datfrozenxid) > (SELECT min(age(x)) FROM 
frozen_witness);
 
-?column?              
-----------------------
-datfrozenxid retreated
-(1 row)
+?column?
+--------
+(0 rows)
 
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index cc1e47a..fe26984 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -14,15 +14,16 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
@@ -58,8 +59,9 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
+step addk2: <... completed>
 
 starting permutation: b2 sfnku2 addk2 c2
 step b2: BEGIN;
@@ -98,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
        SELECT relhasindex FROM pg_class
@@ -122,17 +124,19 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
diff --git a/src/test/isolation/specs/intra-grant-inplace-db.spec 
b/src/test/isolation/specs/intra-grant-inplace-db.spec
index bbecd5d..9de40ec 100644
--- a/src/test/isolation/specs/intra-grant-inplace-db.spec
+++ b/src/test/isolation/specs/intra-grant-inplace-db.spec
@@ -42,5 +42,4 @@ step cmp3     {
 }
 
 
-# XXX extant bug
 permutation snap3 b1 grant1 vac2(c1) snap3 c1 cmp3
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index 3cd696b..d07ed3b 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -48,6 +48,7 @@ step sfu3     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
+step as3       { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3        { ROLLBACK; }
 
 # Additional heap_update()
@@ -73,7 +74,7 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned post-bugfix behavior
+# XXX extant bugs: permutation comments refer to planned future LockTuple()
 
 permutation
        b1
@@ -117,6 +118,7 @@ permutation
        b1
        grant1(r3)      # acquire LockTuple(), await sfu3 xmax
        read2
+       as3                     # XXX temporary until patch adds locking to 
addk2
        addk2(c1)       # block in LockTuple() behind grant1
        r3                      # unblock grant1; addk2 now awaits grant1 xmax
        c1
diff --git a/src/test/modules/injection_points/expected/inplace.out 
b/src/test/modules/injection_points/expected/inplace.out
index 123f45a..db7dab6 100644
--- a/src/test/modules/injection_points/expected/inplace.out
+++ b/src/test/modules/injection_points/expected/inplace.out
@@ -40,4 +40,301 @@ step read1:
        SELECT reltuples = -1 AS reltuples_unknown
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 
-ERROR:  could not create unique index "pg_class_oid_index"
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 grant2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: vac1 begin2 grant2 revoke2 mkrels3 c2 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step c2: COMMIT;
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 grant2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/inplace.spec 
b/src/test/modules/injection_points/specs/inplace.spec
index e957713..86539a5 100644
--- a/src/test/modules/injection_points/specs/inplace.spec
+++ b/src/test/modules/injection_points/specs/inplace.spec
@@ -32,12 +32,9 @@ setup
        CREATE TABLE vactest.orig50 ();
        SELECT vactest.mkrels('orig', 51, 100);
 }
-
-# XXX DROP causes an assertion failure; adopt DROP once fixed
 teardown
 {
-       --DROP SCHEMA vactest CASCADE;
-       DO $$BEGIN EXECUTE 'ALTER SCHEMA vactest RENAME TO schema' || oid FROM 
pg_namespace where nspname = 'vactest'; END$$;
+       DROP SCHEMA vactest CASCADE;
        DROP EXTENSION injection_points;
 }
 
@@ -56,11 +53,13 @@ step read1  {
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 }
 
-
 # Transactional updates of the tuple vac1 is waiting to inplace-update.
 session s2
 step grant2            { GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC; }
-
+step revoke2   { REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC; }
+step begin2            { BEGIN; }
+step c2                        { COMMIT; }
+step r2                        { ROLLBACK; }
 
 # Non-blocking actions.
 session s3
@@ -74,10 +73,69 @@ step mkrels3        {
 }
 
 
-# XXX extant bug
+# target gains a successor at the last moment
 permutation
        vac1(mkrels3)   # reads pg_class tuple T0 for vactest.orig50, xmax 
invalid
        grant2                  # T0 becomes eligible for pruning, T1 is 
successor
        vac3                    # T0 becomes LP_UNUSED
-       mkrels3                 # T0 reused; vac1 wakes and overwrites the 
reused T0
+       mkrels3                 # vac1 wakes, scans to T1
        read1
+
+# target already has a successor, which commits
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       c2                              # T0 becomes eligible for pruning
+       vac3                    # T0 becomes LP_UNUSED
+       mkrels3                 # vac1 wakes, scans to T1
+       read1
+
+# target already has a successor, which becomes LP_UNUSED at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       r2                              # T1 becomes eligible for pruning
+       vac3                    # T1 becomes LP_UNUSED
+       mkrels3                 # reuse T1; vac1 scans to T0
+       read1
+
+# target already has a successor, which becomes LP_REDIRECT at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       c2
+       revoke2                 # HOT update to T2
+       grant2                  # HOT update to T3
+       vac3                    # T1 becomes LP_REDIRECT
+       mkrels3                 # reuse T2; vac1 scans to T3
+       read1
+
+# waiting for updater to end
+permutation
+       vac1(c2)                # reads pg_class tuple T0 for vactest.orig50, 
xmax invalid
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       revoke2                 # HOT update to T2
+       mkrels3                 # vac1 awakes briefly, then waits for s2
+       c2
+       read1
+
+# Another LP_UNUSED.  This time, do change the live tuple.  Final live tuple
+# body is identical to original, at a different TID.
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       r2                              # T1 becomes eligible for pruning
+       grant2                  # T0.t_ctid = T2; T0 becomes eligible for 
pruning
+       revoke2                 # T2.t_ctid = T3; T2 becomes eligible for 
pruning
+       vac3                    # T0, T1 & T2 become LP_UNUSED
+       mkrels3                 # reuse T0, T1 & T2; vac1 scans to T3
+       read1
+
+# Another LP_REDIRECT.  Compared to the earlier test, omit the last grant2.
+# Hence, final live tuple body is identical to original, at a different TID.
+permutation begin2 grant2 vac1(mkrels3) c2 revoke2 vac3 mkrels3 read1
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    For inplace update durability, make heap_update() callers wait.
    
    The previous commit fixed some ways of losing an inplace update.  It
    remained possible to lose one when a backend working toward a
    heap_update() copied a tuple into memory just before inplace update of
    that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
    to govern admission to the steps of copying an old tuple, modifying it,
    and issuing heap_update().  This includes UPDATE and MERGE commands.  To
    avoid changing most of the pg_class DDL, don't require LOCKTAG_TUPLE
    when holding a relation lock sufficient to exclude inplace updaters.
    Back-patch to v12 (all supported versions).
    
    Reviewed by Nitin Motiani and FIXME.
    
    Discussion: https://postgr.es/m/20231027214946.79.nmi...@google.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index ddb2def..818cd7f 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -154,6 +154,48 @@ The following infomask bits are applicable:
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
 
+Locking to write inplace-updated tables
+---------------------------------------
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives systable_inplace_update_begin() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class systable_inplace_update_begin() callers: before the call, acquire a
+  lock on the relation in mode ShareUpdateExclusiveLock or stricter.  If the
+  update targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX),
+  that lock must be on the table.  Locking the index rel is not necessary.
+  (This allows VACUUM to overwrite per-index pg_class while holding a lock on
+  the table alone.) systable_inplace_update_begin() acquires and releases
+  LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each
+  tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
+  ShareRowExclusiveLock or stricter on the relation.
+
+  SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
+  Most heap_update() callers already hold a suitable lock on the relation for
+  other reasons and can skip the tuple lock.  If you do acquire the tuple
+  lock, release it immediately after the update.
+
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
 Reading inplace-updated columns
 -------------------------------
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0b7dc0a..4d75f7a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,8 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -75,6 +77,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  Buffer 
newbuf, HeapTuple oldtup,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                               
                 ItemPointer otid,
+                                                                               
                 HeapTuple newtup);
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                                                                
   Bitmapset *interesting_cols,
                                                                                
   Bitmapset *external_cols,
@@ -121,6 +129,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, 
HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3207,6 +3217,10 @@ heap_update(Relation relation, ItemPointer otid, 
HeapTuple newtup,
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
+#ifdef USE_ASSERT_CHECKING
+       check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
        /*
         * Fetch the list of attributes to be checked for various operations.
         *
@@ -4071,6 +4085,128 @@ l2:
        return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                        
ItemPointer otid,
+                                                                        
HeapTuple newtup)
+{
+       /* LOCKTAG_TUPLE acceptable for any catalog */
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+               case DatabaseRelationId:
+                       {
+                               LOCKTAG         tuptag;
+
+                               SET_LOCKTAG_TUPLE(tuptag,
+                                                                 
relation->rd_lockInfo.lockRelId.dbId,
+                                                                 
relation->rd_lockInfo.lockRelId.relId,
+                                                                 
ItemPointerGetBlockNumber(otid),
+                                                                 
ItemPointerGetOffsetNumber(otid));
+                               if (LockHeldByMe(&tuptag, 
InplaceUpdateTupleLock, false))
+                                       return;
+                       }
+                       break;
+               default:
+                       Assert(!IsInplaceUpdateRelation(relation));
+                       return;
+       }
+
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+                       {
+                               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+                               Form_pg_class classForm = (Form_pg_class) 
GETSTRUCT(newtup);
+                               Oid                     relid = classForm->oid;
+                               Oid                     dbid;
+                               LOCKTAG         tag;
+
+                               if (IsSharedRelation(relid))
+                                       dbid = InvalidOid;
+                               else
+                                       dbid = MyDatabaseId;
+
+                               if (classForm->relkind == RELKIND_INDEX)
+                               {
+                                       Relation        irel = 
index_open(relid, AccessShareLock);
+
+                                       SET_LOCKTAG_RELATION(tag, dbid, 
irel->rd_index->indrelid);
+                                       index_close(irel, AccessShareLock);
+                               }
+                               else
+                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+                               if (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
+                                       !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true))
+                                       elog(WARNING,
+                                                "missing lock for relation 
\"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                                                NameStr(classForm->relname),
+                                                relid,
+                                                classForm->relkind,
+                                                
ItemPointerGetBlockNumber(otid),
+                                                
ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+               case DatabaseRelationId:
+                       {
+                               /* LOCKTAG_TUPLE required */
+                               Form_pg_database dbForm = (Form_pg_database) 
GETSTRUCT(newtup);
+
+                               elog(WARNING,
+                                        "missing lock on database \"%s\" (OID 
%u) @ TID (%u,%u)",
+                                        NameStr(dbForm->datname),
+                                        dbForm->oid,
+                                        ItemPointerGetBlockNumber(otid),
+                                        ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+       }
+}
+
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+       Oid                     relid = classForm->oid;
+       Oid                     dbid;
+       LOCKTAG         tag;
+
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
+
+       if (classForm->relkind == RELKIND_INDEX)
+       {
+               Relation        irel = index_open(relid, AccessShareLock);
+
+               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+               index_close(irel, AccessShareLock);
+       }
+       else
+               SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+       if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+               elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) 
@ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(&oldtup->t_self),
+                        ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6088,15 +6224,21 @@ heap_inplace_lock(Relation relation,
        TM_Result       result;
        bool            ret;
 
+#ifdef USE_ASSERT_CHECKING
+       if (RelationGetRelid(relation) == RelationRelationId)
+               check_inplace_rel_lock(oldtup_ptr);
+#endif
+
        Assert(BufferIsValid(buffer));
 
+       LockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
        /*----------
         * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
         *
         * - wait unconditionally
-        * - no tuple locks
+        * - already locked tuple above, since inplace needs that 
unconditionally
         * - don't recheck header after wait: simpler to defer to next iteration
         * - don't try to continue even if the updater aborts: likewise
         * - no crosscheck
@@ -6180,7 +6322,10 @@ heap_inplace_lock(Relation relation,
         * don't bother optimizing that.
         */
        if (!ret)
+       {
+               UnlockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
                InvalidateCatalogSnapshot();
+       }
        return ret;
 }
 
@@ -6189,6 +6334,8 @@ heap_inplace_lock(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_and_unlock(Relation relation,
@@ -6272,6 +6419,7 @@ heap_inplace_unlock(Relation relation,
                                        HeapTuple oldtup, Buffer buffer)
 {
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
 }
 
 #define                FRM_NOOP                                0x0001
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 964a9a2..83d5717 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -755,7 +755,9 @@ systable_endscan_ordered(SysScanDesc sysscan)
  *
  * Overwriting violates both MVCC and transactional safety, so the uses of
  * this function in Postgres are extremely limited.  Nonetheless we find some
- * places to use it.  Standard flow:
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
  * ... [any slow preparation not requiring oldtup] ...
  * systable_inplace_update_begin([...], &tup, &inplace_state);
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index a44ccee..bc0e259 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                HeapTuple       tuple;
                ListCell   *cell_colprivs;
 
-               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+               tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", 
relOid);
                pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                                                                
 values, nulls, replaces);
 
                        CatalogTupleUpdate(relation, &newtuple->t_self, 
newtuple);
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                        /* Update initial privileges for extensions */
                        recordExtensionInitPriv(relOid, RelationRelationId, 0, 
new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
                        pfree(new_acl);
                }
+               else
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                /*
                 * Handle column-level privileges, if any were specified or 
implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                Oid                *oldmembers;
                Oid                *newmembers;
 
-               tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+               tuple = SearchSysCacheLocked1(cacheid, 
ObjectIdGetDatum(objectid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for %s %u", 
get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                                                                         nulls, 
replaces);
 
                CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+               UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /* Update initial privileges for extensions */
                recordExtensionInitPriv(objectid, classid, 0, new_acl);
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 6c39434..8aefbcd 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *             True iff core code performs inplace updates on the relation.
+ *
+ *             This is used for assertions and for making the executor follow 
the
+ *             locking protocol described at README.tuplock section "Locking 
to write
+ *             inplace-updated tables".  Extensions may inplace-update other 
heap
+ *             tables, but concurrent SQL UPDATE on the same table may 
overwrite
+ *             those modifications.
+ *
+ *             The executor can assume these are not partitions or partitioned 
and
+ *             have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index 40bfd09..aa91a39 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1877,6 +1877,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
        Oid                     db_id;
        HeapTuple       newtup;
+       ItemPointerData otid;
        Relation        rel;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1948,11 +1949,13 @@ RenameDatabase(const char *oldname, const char *newname)
                                 errdetail_busy_db(notherbackends, 
npreparedxacts)));
 
        /* rename */
-       newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       newtup = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(db_id));
        if (!HeapTupleIsValid(newtup))
                elog(ERROR, "cache lookup failed for database %u", db_id);
+       otid = newtup->t_self;
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+       CatalogTupleUpdate(rel, &otid, newtup);
+       UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2201,6 +2204,7 @@ movedb(const char *dbname, const char *tblspcname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not 
exist", dbname)));
+               LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                new_record[Anum_pg_database_dattablespace - 1] = 
ObjectIdGetDatum(dst_tblspcoid);
                new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2209,6 +2213,7 @@ movedb(const char *dbname, const char *tblspcname)
                                                                         
new_record,
                                                                         
new_record_nulls, new_record_repl);
                CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+               UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2439,6 +2444,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", 
stmt->dbname)));
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
@@ -2488,6 +2494,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                                                 
new_record_nulls, new_record_repl);
        CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2537,6 +2544,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
        if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datum = heap_getattr(tuple, Anum_pg_database_datcollversion, 
RelationGetDescr(rel), &isnull);
        oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2565,6 +2573,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                bool            nulls[Natts_pg_database] = {0};
                bool            replaces[Natts_pg_database] = {0};
                Datum           values[Natts_pg_database] = {0};
+               HeapTuple       newtuple;
 
                ereport(NOTICE,
                                (errmsg("changing version from %s to %s",
@@ -2573,14 +2582,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                values[Anum_pg_database_datcollversion - 1] = 
CStringGetTextDatum(newversion);
                replaces[Anum_pg_database_datcollversion - 1] = true;
 
-               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                                                 values, 
nulls, replaces);
-               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-               heap_freetuple(tuple);
+               newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                        
values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+               heap_freetuple(newtuple);
        }
        else
                ereport(NOTICE,
                                (errmsg("version has not changed")));
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2692,6 +2702,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                                        
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to change 
owner of database")));
 
+               LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
                repl_repl[Anum_pg_database_datdba - 1] = true;
                repl_val[Anum_pg_database_datdba - 1] = 
ObjectIdGetDatum(newOwnerId);
 
@@ -2713,6 +2725,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
                newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), 
repl_val, repl_null, repl_repl);
                CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+               UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 55baf10..05a6de6 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
        /* Set dathasloginevt flag in pg_database */
        Form_pg_database db;
        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
+       ItemPointerData otid;
        HeapTuple       tuple;
 
        /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
         */
        LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, 
AccessExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
+       tuple = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for database %u", 
MyDatabaseId);
+       otid = tuple->t_self;
        db = (Form_pg_database) GETSTRUCT(tuple);
        if (!db->dathasloginevt)
        {
                db->dathasloginevt = true;
-               CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_db, &otid, tuple);
                CommandCounterIncrement();
        }
+       UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
        table_close(pg_db, RowExclusiveLock);
        heap_freetuple(tuple);
 }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index c5a56c7..6b22a88 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -4413,14 +4413,17 @@ update_relispartition(Oid relationId, bool newval)
 {
        HeapTuple       tup;
        Relation        classRel;
+       ItemPointerData otid;
 
        classRel = table_open(RelationRelationId, RowExclusiveLock);
-       tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+       tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relationId);
+       otid = tup->t_self;
        Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
        ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-       CatalogTupleUpdate(classRel, &tup->t_self, tup);
+       CatalogTupleUpdate(classRel, &otid, tup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tup);
        table_close(classRel, RowExclusiveLock);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b3cc6f8..7870e93 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3586,6 +3586,7 @@ SetRelationTableSpace(Relation rel,
 {
        Relation        pg_class;
        HeapTuple       tuple;
+       ItemPointerData otid;
        Form_pg_class rd_rel;
        Oid                     reloid = RelationGetRelid(rel);
 
@@ -3594,9 +3595,10 @@ SetRelationTableSpace(Relation rel,
        /* Get a modifiable copy of the relation's pg_class row. */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+       tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", reloid);
+       otid = tuple->t_self;
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
        /* Update the pg_class row. */
@@ -3604,7 +3606,8 @@ SetRelationTableSpace(Relation rel,
                InvalidOid : newTableSpaceId;
        if (RelFileNumberIsValid(newRelFilenumber))
                rd_rel->relfilenode = newRelFilenumber;
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
        /*
         * Record dependency on tablespace.  This is only required for relations
@@ -4098,6 +4101,7 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
 {
        Relation        targetrelation;
        Relation        relrelation;    /* for RELATION relation */
+       ItemPointerData otid;
        HeapTuple       reltup;
        Form_pg_class relform;
        Oid                     namespaceId;
@@ -4120,7 +4124,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       otid = reltup->t_self;
        if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
        relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4147,7 +4152,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        namestrcpy(&(relform->relname), newrelname);
 
-       CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+       CatalogTupleUpdate(relrelation, &otid, reltup);
+       UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                                                 InvalidOid, 
is_internal);
@@ -14875,7 +14881,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -14979,6 +14985,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                                                                 repl_val, 
repl_null, repl_repl);
 
        CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+       UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17131,7 +17138,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        ObjectAddress thisobj;
        bool            already_done = false;
 
-       classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+       /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+       classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(classTup))
                elog(ERROR, "cache lookup failed for relation %u", relOid);
        classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17150,6 +17158,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        already_done = object_address_present(&thisobj, objsMoved);
        if (!already_done && oldNspOid != newNspOid)
        {
+               ItemPointerData otid = classTup->t_self;
+
                /* check for duplicate name (more friendly than unique-index 
failure) */
                if (get_relname_relid(NameStr(classForm->relname),
                                                          newNspOid) != 
InvalidOid)
@@ -17162,7 +17172,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                /* classTup is a copy, so OK to scribble on */
                classForm->relnamespace = newNspOid;
 
-               CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+               CatalogTupleUpdate(classRel, &otid, classTup);
+               UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
                /* Update dependency on schema if caller said so */
                if (hasDependEntry &&
@@ -17174,6 +17186,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                        elog(ERROR, "could not change schema dependency for 
relation \"%s\"",
                                 NameStr(classForm->relname));
        }
+       else
+               UnlockTuple(classRel, &classTup->t_self, 
InplaceUpdateTupleLock);
        if (!already_done)
        {
                add_exact_object_address(&thisobj, objsMoved);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 29e186f..f04cd0b 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1024,6 +1024,10 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, 
CmdType operation,
        Relation        resultRel = resultRelInfo->ri_RelationDesc;
        FdwRoutine *fdwroutine;
 
+       /* Expect a fully-formed ResultRelInfo from InitResultRelInfo(). */
+       Assert(resultRelInfo->ri_needLockTagTuple ==
+                  IsInplaceUpdateRelation(resultRel));
+
        switch (resultRel->rd_rel->relkind)
        {
                case RELKIND_RELATION:
@@ -1204,6 +1208,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_NumIndices = 0;
        resultRelInfo->ri_IndexRelationDescs = NULL;
        resultRelInfo->ri_IndexRelationInfo = NULL;
+       resultRelInfo->ri_needLockTagTuple =
+               IsInplaceUpdateRelation(resultRelationDesc);
        /* make a copy so as not to depend on relcache info not changing... */
        resultRelInfo->ri_TrigDesc = 
CopyTriggerDesc(resultRelationDesc->trigdesc);
        if (resultRelInfo->ri_TrigDesc)
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index 1086cbc..54025c9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -661,8 +661,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        Relation        rel = resultRelInfo->ri_RelationDesc;
        ItemPointer tid = &(searchslot->tts_tid);
 
-       /* For now we support only tables. */
+       /*
+        * We support only non-system tables, with
+        * check_publication_add_relation() accountable.
+        */
        Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+       Assert(!IsCatalogRelation(rel));
 
        CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
diff --git a/src/backend/executor/nodeModifyTable.c 
b/src/backend/executor/nodeModifyTable.c
index 8bf4c80..1161520 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2324,6 +2324,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
        }
        else
        {
+               ItemPointerData lockedtid;
+
                /*
                 * If we generate a new candidate tuple after EvalPlanQual 
testing, we
                 * must loop back here to try again.  (We don't need to redo 
triggers,
@@ -2332,6 +2334,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
                 * to do them again.)
                 */
 redo_act:
+               lockedtid = *tupleid;
                result = ExecUpdateAct(context, resultRelInfo, tupleid, 
oldtuple, slot,
                                                           canSetTag, 
&updateCxt);
 
@@ -2425,6 +2428,14 @@ redo_act:
                                                                
ExecInitUpdateProjection(context->mtstate,
                                                                                
                                 resultRelInfo);
 
+                                                       if 
(resultRelInfo->ri_needLockTagTuple)
+                                                       {
+                                                               
UnlockTuple(resultRelationDesc,
+                                                                               
        &lockedtid, InplaceUpdateTupleLock);
+                                                               
LockTuple(resultRelationDesc,
+                                                                               
  tupleid, InplaceUpdateTupleLock);
+                                                       }
+
                                                        /* Fetch the most 
recent version of old tuple. */
                                                        oldSlot = 
resultRelInfo->ri_oldTupleSlot;
                                                        if 
(!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2529,6 +2540,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
        TransactionId xmin;
        bool            isnull;
 
+       /*
+        * Parse analysis should have blocked ON CONFLICT for all system
+        * relations, which includes these.  There's no fundamental obstacle to
+        * supporting this; we'd just need to handle LOCKTAG_TUPLE like the 
other
+        * ExecUpdate() caller.
+        */
+       Assert(!resultRelInfo->ri_needLockTagTuple);
+
        /* Determine lock mode to use */
        lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2854,6 +2873,7 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
 {
        ModifyTableState *mtstate = context->mtstate;
        List      **mergeActions = resultRelInfo->ri_MergeActions;
+       ItemPointerData lockedtid;
        List       *actionStates;
        TupleTableSlot *newslot = NULL;
        TupleTableSlot *rslot = NULL;
@@ -2890,14 +2910,32 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
         * target wholerow junk attr.
         */
        Assert(tupleid != NULL || oldtuple != NULL);
+       ItemPointerSetInvalid(&lockedtid);
        if (oldtuple != NULL)
+       {
+               Assert(!resultRelInfo->ri_needLockTagTuple);
                ExecForceStoreHeapTuple(oldtuple, 
resultRelInfo->ri_oldTupleSlot,
                                                                false);
-       else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                                                               
        tupleid,
-                                                                               
        SnapshotAny,
-                                                                               
        resultRelInfo->ri_oldTupleSlot))
-               elog(ERROR, "failed to fetch the target tuple");
+       }
+       else
+       {
+               if (resultRelInfo->ri_needLockTagTuple)
+               {
+                       /*
+                        * This locks even for CMD_DELETE, for CMD_NOTHING, and 
for tuples
+                        * that don't match mas_whenqual.  MERGE on system 
catalogs is a
+                        * minor use case, so don't bother optimizing those.
+                        */
+                       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                         InplaceUpdateTupleLock);
+                       lockedtid = *tupleid;
+               }
+               if 
(!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                                                               
   tupleid,
+                                                                               
   SnapshotAny,
+                                                                               
   resultRelInfo->ri_oldTupleSlot))
+                       elog(ERROR, "failed to fetch the target tuple");
+       }
 
        /*
         * Test the join condition.  If it's satisfied, perform a MATCHED 
action.
@@ -2969,7 +3007,7 @@ lmerge_matched:
                                                                                
tupleid, NULL, newslot, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -2980,11 +3018,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRUpdateTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple, newslot))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
-                                       /* called 
table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecUpdateAct(context, 
resultRelInfo, tupleid,
@@ -3003,7 +3041,8 @@ lmerge_matched:
                                        if (updateCxt.crossPartUpdate)
                                        {
                                                mtstate->mt_merge_updated += 1;
-                                               return 
context->cpUpdateReturningSlot;
+                                               rslot = 
context->cpUpdateReturningSlot;
+                                               goto out;
                                        }
                                }
 
@@ -3021,7 +3060,7 @@ lmerge_matched:
                                                                                
NULL, NULL, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -3032,11 +3071,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRDeleteTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
-                                       /* called 
table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecDeleteAct(context, 
resultRelInfo, tupleid,
@@ -3118,7 +3157,7 @@ lmerge_matched:
                                 * let caller handle it under NOT MATCHED [BY 
TARGET] clauses.
                                 */
                                *matched = false;
-                               return NULL;
+                               goto out;
 
                        case TM_Updated:
                                {
@@ -3192,7 +3231,7 @@ lmerge_matched:
                                                                 * more to do.
                                                                 */
                                                                if 
(TupIsNull(epqslot))
-                                                                       return 
NULL;
+                                                                       goto 
out;
 
                                                                /*
                                                                 * If we got a 
NULL ctid from the subplan, the
@@ -3210,6 +3249,15 @@ lmerge_matched:
                                                                 * we need to 
switch to the NOT MATCHED BY
                                                                 * SOURCE case.
                                                                 */
+                                                               if 
(resultRelInfo->ri_needLockTagTuple)
+                                                               {
+                                                                       if 
(ItemPointerIsValid(&lockedtid))
+                                                                               
UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                                               
                        InplaceUpdateTupleLock);
+                                                                       
LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                                                               
          InplaceUpdateTupleLock);
+                                                                       
lockedtid = context->tmfd.ctid;
+                                                               }
                                                                if 
(!table_tuple_fetch_row_version(resultRelationDesc,
                                                                                
                                                   &context->tmfd.ctid,
                                                                                
                                                   SnapshotAny,
@@ -3238,7 +3286,7 @@ lmerge_matched:
                                                         * MATCHED [BY TARGET] 
actions
                                                         */
                                                        *matched = false;
-                                                       return NULL;
+                                                       goto out;
 
                                                case TM_SelfModified:
 
@@ -3266,13 +3314,13 @@ lmerge_matched:
 
                                                        /* This shouldn't 
happen */
                                                        elog(ERROR, "attempted 
to update or delete invisible tuple");
-                                                       return NULL;
+                                                       goto out;
 
                                                default:
                                                        /* see table_tuple_lock 
call in ExecDelete() */
                                                        elog(ERROR, "unexpected 
table_tuple_lock status: %u",
                                                                 result);
-                                                       return NULL;
+                                                       goto out;
                                        }
                                }
 
@@ -3319,6 +3367,10 @@ lmerge_matched:
        /*
         * Successfully executed an action or no qualifying action was found.
         */
+out:
+       if (ItemPointerIsValid(&lockedtid))
+               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                       InplaceUpdateTupleLock);
        return rslot;
 }
 
@@ -3770,6 +3822,7 @@ ExecModifyTable(PlanState *pstate)
        HeapTupleData oldtupdata;
        HeapTuple       oldtuple;
        ItemPointer tupleid;
+       bool            tuplock;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -4082,6 +4135,8 @@ ExecModifyTable(PlanState *pstate)
                                break;
 
                        case CMD_UPDATE:
+                               tuplock = false;
+
                                /* Initialize projection info if first time for 
this table */
                                if 
(unlikely(!resultRelInfo->ri_projectNewInfoValid))
                                        ExecInitUpdateProjection(node, 
resultRelInfo);
@@ -4093,6 +4148,7 @@ ExecModifyTable(PlanState *pstate)
                                oldSlot = resultRelInfo->ri_oldTupleSlot;
                                if (oldtuple != NULL)
                                {
+                                       
Assert(!resultRelInfo->ri_needLockTagTuple);
                                        /* Use the wholerow junk attr as the 
old tuple. */
                                        ExecForceStoreHeapTuple(oldtuple, 
oldSlot, false);
                                }
@@ -4101,6 +4157,11 @@ ExecModifyTable(PlanState *pstate)
                                        /* Fetch the most recent version of old 
tuple. */
                                        Relation        relation = 
resultRelInfo->ri_RelationDesc;
 
+                                       if (resultRelInfo->ri_needLockTagTuple)
+                                       {
+                                               LockTuple(relation, tupleid, 
InplaceUpdateTupleLock);
+                                               tuplock = true;
+                                       }
                                        if 
(!table_tuple_fetch_row_version(relation, tupleid,
                                                                                
                           SnapshotAny,
                                                                                
                           oldSlot))
@@ -4112,6 +4173,9 @@ ExecModifyTable(PlanState *pstate)
                                /* Now apply the update. */
                                slot = ExecUpdate(&context, resultRelInfo, 
tupleid, oldtuple,
                                                                  slot, 
node->canSetTag);
+                               if (tuplock)
+                                       
UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                                               
InplaceUpdateTupleLock);
                                break;
 
                        case CMD_DELETE:
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index 63efc55..16d2c96 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3768,6 +3768,7 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
 {
        RelFileNumber newrelfilenumber;
        Relation        pg_class;
+       ItemPointerData otid;
        HeapTuple       tuple;
        Form_pg_class classform;
        MultiXactId minmulti = InvalidMultiXactId;
@@ -3810,11 +3811,12 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
         */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID,
-                                                               
ObjectIdGetDatum(RelationGetRelid(relation)));
+       tuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                         
ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u",
                         RelationGetRelid(relation));
+       otid = tuple->t_self;
        classform = (Form_pg_class) GETSTRUCT(tuple);
 
        /*
@@ -3934,9 +3936,10 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
                classform->relminmxid = minmulti;
                classform->relpersistence = persistence;
 
-               CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_class, &otid, tuple);
        }
 
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tuple);
 
        table_close(pg_class, RowExclusiveLock);
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 3e03dfc..50c9440 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -30,7 +30,10 @@
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -269,6 +272,98 @@ ReleaseSysCache(HeapTuple tuple)
 }
 
 /*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                                         Datum key1)
+{
+       ItemPointerData tid;
+       LOCKTAG         tag;
+       Oid                     dboid =
+               SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+       Oid                     reloid = cacheinfo[cacheId].reloid;
+
+       /*----------
+        * Since inplace updates may happen just before our LockTuple(), we must
+        * return content acquired after LockTuple() of the TID we return.  If 
we
+        * just fetched twice instead of looping, the following sequence would
+        * defeat our locking:
+        *
+        * GRANT:   SearchSysCache1() = TID (1,5)
+        * GRANT:   LockTuple(pg_class, (1,5))
+        * [no more inplace update of (1,5) until we release the lock]
+        * CLUSTER: SearchSysCache1() = TID (1,5)
+        * CLUSTER: heap_update() = TID (1,8)
+        * CLUSTER: COMMIT
+        * GRANT:   SearchSysCache1() = TID (1,8)
+        * GRANT:   return (1,8) from SearchSysCacheLocked1()
+        * VACUUM:  SearchSysCache1() = TID (1,8)
+        * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one 
rel
+        * VACUUM:  inplace update
+        * GRANT:   heap_update() = (1,9)  # lose inplace update
+        *
+        * In the happy case, this takes two fetches, one to determine the TID 
to
+        * lock and another to get the content and confirm the TID didn't 
change.
+        *
+        * This is valid even if the row gets updated to a new TID, the old TID
+        * becomes LP_UNUSED, and the row gets updated back to its old TID.  
We'd
+        * still hold the right LOCKTAG_TUPLE and a copy of the row captured 
after
+        * the LOCKTAG_TUPLE.
+        */
+       ItemPointerSetInvalid(&tid);
+       for (;;)
+       {
+               HeapTuple       tuple;
+               LOCKMODE        lockmode = InplaceUpdateTupleLock;
+
+               tuple = SearchSysCache1(cacheId, key1);
+               if (ItemPointerIsValid(&tid))
+               {
+                       if (!HeapTupleIsValid(tuple))
+                       {
+                               LockRelease(&tag, lockmode, false);
+                               return tuple;
+                       }
+                       if (ItemPointerEquals(&tid, &tuple->t_self))
+                               return tuple;
+                       LockRelease(&tag, lockmode, false);
+               }
+               else if (!HeapTupleIsValid(tuple))
+                       return tuple;
+
+               tid = tuple->t_self;
+               ReleaseSysCache(tuple);
+               /* like: LockTuple(rel, &tid, lockmode) */
+               SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                                                 
ItemPointerGetBlockNumber(&tid),
+                                                 
ItemPointerGetOffsetNumber(&tid));
+               (void) LockAcquire(&tag, lockmode, false, false);
+
+               /*
+                * If an inplace update just finished, ensure we process the 
syscache
+                * inval.  XXX this is insufficient: the inplace updater may 
not yet
+                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                *
+                * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
+                * probably find the old tuple and reach "tuple concurrently 
updated".
+                * If that heap_update() aborts, our LOCKTAG_TUPLE blocks 
inplace
+                * updates while our caller works.
+                */
+               AcceptInvalidationMessages();
+       }
+}
+
+/*
  * SearchSysCacheCopy
  *
  * A convenience routine that does SearchSysCache and (if successful)
@@ -295,6 +390,28 @@ SearchSysCacheCopy(int cacheId,
 }
 
 /*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                                                 Datum key1)
+{
+       HeapTuple       tuple,
+                               newtuple;
+
+       tuple = SearchSysCacheLocked1(cacheId, key1);
+       if (!HeapTupleIsValid(tuple))
+               return tuple;
+       newtuple = heap_copytuple(tuple);
+       ReleaseSysCache(tuple);
+       return newtuple;
+}
+
+/*
  * SearchSysCacheExists
  *
  * A convenience routine that just probes to see if a tuple can be found.
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index af7d8fd..b078a6e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -482,6 +482,9 @@ typedef struct ResultRelInfo
        /* Have the projection and the slots above been initialized? */
        bool            ri_projectNewInfoValid;
 
+       /* updates do LockTuple() before oldtup read; see README.tuplock */
+       bool            ri_needLockTagTuple;
+
        /* triggers to be fired, if any */
        TriggerDesc *ri_TrigDesc;
 
diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h
index 934ba84..810b297 100644
--- a/src/include/storage/lockdefs.h
+++ b/src/include/storage/lockdefs.h
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                            8       /* highest standard 
lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 03a27dd..b541911 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                                                          
Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                                                        Datum 
key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                                                               
   Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                                                 Datum key1, 
Datum key2, Datum key3, Datum key4);
 extern Oid     GetSysCacheOid(int cacheId, AttrNumber oidcol,
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index fe26984..b5fe8b0 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -100,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
        SELECT relhasindex FROM pg_class
@@ -124,7 +124,6 @@ relhasindex
 f          
 (1 row)
 
-step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
@@ -155,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  <waiting ...>
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
        SELECT relhasindex FROM pg_class
@@ -195,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -224,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
diff --git a/src/test/isolation/specs/eval-plan-qual.spec 
b/src/test/isolation/specs/eval-plan-qual.spec
index 3a74406..07307e6 100644
--- a/src/test/isolation/specs/eval-plan-qual.spec
+++ b/src/test/isolation/specs/eval-plan-qual.spec
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
        update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1      {
        UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index d07ed3b..2992c85 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1        { BEGIN; }
 step grant1    {
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1       { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
@@ -48,7 +50,6 @@ step sfu3     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
-step as3       { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3        { ROLLBACK; }
 
 # Additional heap_update()
@@ -74,8 +75,6 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
        b1
        grant1
@@ -118,7 +117,6 @@ permutation
        b1
        grant1(r3)      # acquire LockTuple(), await sfu3 xmax
        read2
-       as3                     # XXX temporary until patch adds locking to 
addk2
        addk2(c1)       # block in LockTuple() behind grant1
        r3                      # unblock grant1; addk2 now awaits grant1 xmax
        c1
@@ -128,8 +126,8 @@ permutation
        b2
        sfnku2
        b1
-       grant1(c2)              # acquire LockTuple(), await sfnku2 xmax
-       addk2                   # block in LockTuple() behind grant1 = deadlock
+       grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+       addk2(*)                # block in LockTuple() behind grant1 = deadlock
        c2
        c1
        read2
@@ -140,7 +138,7 @@ permutation
        grant1
        b3
        sfu3(c1)        # acquire LockTuple(), await grant1 xmax
-       revoke4(sfu3)   # block in LockTuple() behind sfu3
+       revoke4(r3)     # block in LockTuple() behind sfu3
        c1
        r3                      # revoke4 unlocks old tuple and finds new
 
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Require tuple locks for heap_update() of RELKIND_INDEX pg_class rows.
    
    [To be squashed into inplace120-locktag if we're keeping it]

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index 818cd7f..d3f3e29 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -168,18 +168,17 @@ using locktags as follows.  While DDL code is the main 
audience, the executor
 follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
 are per-catalog:
 
-  pg_class systable_inplace_update_begin() callers: before the call, acquire a
-  lock on the relation in mode ShareUpdateExclusiveLock or stricter.  If the
-  update targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX),
-  that lock must be on the table.  Locking the index rel is not necessary.
-  (This allows VACUUM to overwrite per-index pg_class while holding a lock on
-  the table alone.) systable_inplace_update_begin() acquires and releases
-  LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each
-  tuple it overwrites.
+  pg_class systable_inplace_update_begin() callers: if the pg_class row
+  pertains to an index (but not RELKIND_PARTITIONED_INDEX), no lock is
+  required.  Otherwise, before the call, acquire a lock on the relation in
+  mode ShareUpdateExclusiveLock or stricter.  systable_inplace_update_begin()
+  acquires and releases LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for
+  ExclusiveLock, on each tuple it overwrites.
 
-  pg_class heap_update() callers: before copying the tuple to modify, take a
-  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
-  ShareRowExclusiveLock or stricter on the relation.
+  pg_class heap_update() callers: acquire a lock before copying the tuple to
+  modify.  If the pg_class row pertains to an index, lock the tuple.
+  Otherwise, lock the tuple, get a ShareUpdateExclusiveLock on the relation,
+  or get a ShareRowExclusiveLock or stricter on the relation.
 
   SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
   Most heap_update() callers already hold a suitable lock on the relation for
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4d75f7a..d943dc8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -4131,19 +4131,10 @@ check_lock_if_inplace_updateable_rel(Relation relation,
                                        dbid = InvalidOid;
                                else
                                        dbid = MyDatabaseId;
-
-                               if (classForm->relkind == RELKIND_INDEX)
-                               {
-                                       Relation        irel = 
index_open(relid, AccessShareLock);
-
-                                       SET_LOCKTAG_RELATION(tag, dbid, 
irel->rd_index->indrelid);
-                                       index_close(irel, AccessShareLock);
-                               }
-                               else
-                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
-
-                               if (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
-                                       !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true))
+                               SET_LOCKTAG_RELATION(tag, dbid, relid);
+                               if (classForm->relkind == RELKIND_INDEX ||
+                                       (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
+                                        !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true)))
                                        elog(WARNING,
                                                 "missing lock for relation 
\"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
                                                 NameStr(classForm->relname),
@@ -4181,21 +4172,14 @@ check_inplace_rel_lock(HeapTuple oldtup)
        Oid                     dbid;
        LOCKTAG         tag;
 
+       if (classForm->relkind == RELKIND_INDEX)
+               return;
+
        if (IsSharedRelation(relid))
                dbid = InvalidOid;
        else
                dbid = MyDatabaseId;
-
-       if (classForm->relkind == RELKIND_INDEX)
-       {
-               Relation        irel = index_open(relid, AccessShareLock);
-
-               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
-               index_close(irel, AccessShareLock);
-       }
-       else
-               SET_LOCKTAG_RELATION(tag, dbid, relid);
-
+       SET_LOCKTAG_RELATION(tag, dbid, relid);
        if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
                elog(WARNING,
                         "missing lock for relation \"%s\" (OID %u, relkind %c) 
@ TID (%u,%u)",
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index e4608b9..579cc0d 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1558,6 +1558,8 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, 
const char *oldName)
                                newClassTuple;
        Form_pg_class oldClassForm,
                                newClassForm;
+       ItemPointerData oldClassTid,
+                               newClassTid;
        HeapTuple       oldIndexTuple,
                                newIndexTuple;
        Form_pg_index oldIndexForm,
@@ -1569,6 +1571,10 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, 
const char *oldName)
 
        /*
         * Take a necessary lock on the old and new index before swapping them.
+        * Since the caller holds session-level locks, this shouldn't deadlock.
+        * The tuple locks come next, and deadlock is possible there.  There's 
no
+        * good use case for altering the temporary index of a REINDEX
+        * CONCURRENTLY, so don't put effort into avoiding said deadlock.
         */
        oldClassRel = relation_open(oldIndexId, ShareUpdateExclusiveLock);
        newClassRel = relation_open(newIndexId, ShareUpdateExclusiveLock);
@@ -1576,15 +1582,17 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, 
const char *oldName)
        /* Now swap names and dependencies of those indexes */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       oldClassTuple = SearchSysCacheCopy1(RELOID,
-                                                                               
ObjectIdGetDatum(oldIndexId));
+       oldClassTuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                               
          ObjectIdGetDatum(oldIndexId));
        if (!HeapTupleIsValid(oldClassTuple))
                elog(ERROR, "could not find tuple for relation %u", oldIndexId);
-       newClassTuple = SearchSysCacheCopy1(RELOID,
-                                                                               
ObjectIdGetDatum(newIndexId));
+       newClassTuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                               
          ObjectIdGetDatum(newIndexId));
        if (!HeapTupleIsValid(newClassTuple))
                elog(ERROR, "could not find tuple for relation %u", newIndexId);
 
+       oldClassTid = oldClassTuple->t_self;
+       newClassTid = newClassTuple->t_self;
        oldClassForm = (Form_pg_class) GETSTRUCT(oldClassTuple);
        newClassForm = (Form_pg_class) GETSTRUCT(newClassTuple);
 
@@ -1597,8 +1605,10 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, 
const char *oldName)
        newClassForm->relispartition = oldClassForm->relispartition;
        oldClassForm->relispartition = isPartition;
 
-       CatalogTupleUpdate(pg_class, &oldClassTuple->t_self, oldClassTuple);
-       CatalogTupleUpdate(pg_class, &newClassTuple->t_self, newClassTuple);
+       CatalogTupleUpdate(pg_class, &oldClassTid, oldClassTuple);
+       UnlockTuple(pg_class, &oldClassTid, InplaceUpdateTupleLock);
+       CatalogTupleUpdate(pg_class, &newClassTid, newClassTuple);
+       UnlockTuple(pg_class, &newClassTid, InplaceUpdateTupleLock);
 
        heap_freetuple(oldClassTuple);
        heap_freetuple(newClassTuple);
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 78f9678..402dc49 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -1074,20 +1074,28 @@ swap_relation_files(Oid r1, Oid r2, bool 
target_is_pg_class,
                                relfilenumber2;
        RelFileNumber swaptemp;
        char            swptmpchr;
+       ItemPointerData otid1,
+                               otid2;
        Oid                     relam1,
                                relam2;
 
-       /* We need writable copies of both pg_class tuples. */
+       /*
+        * We need writable copies of both pg_class tuples.  Since r2 is new in
+        * this transaction, no other process should be getting the tuple lock 
for
+        * that one.  Hence, order of tuple lock acquisition doesn't matter.
+        */
        relRelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
+       reltup1 = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(r1));
        if (!HeapTupleIsValid(reltup1))
                elog(ERROR, "cache lookup failed for relation %u", r1);
+       otid1 = reltup1->t_self;
        relform1 = (Form_pg_class) GETSTRUCT(reltup1);
 
-       reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
+       reltup2 = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(r2));
        if (!HeapTupleIsValid(reltup2))
                elog(ERROR, "cache lookup failed for relation %u", r2);
+       otid2 = reltup2->t_self;
        relform2 = (Form_pg_class) GETSTRUCT(reltup2);
 
        relfilenumber1 = relform1->relfilenode;
@@ -1252,10 +1260,8 @@ swap_relation_files(Oid r1, Oid r2, bool 
target_is_pg_class,
                CatalogIndexState indstate;
 
                indstate = CatalogOpenIndexes(relRelation);
-               CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, 
reltup1,
-                                                                  indstate);
-               CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, 
reltup2,
-                                                                  indstate);
+               CatalogTupleUpdateWithInfo(relRelation, &otid1, reltup1, 
indstate);
+               CatalogTupleUpdateWithInfo(relRelation, &otid2, reltup2, 
indstate);
                CatalogCloseIndexes(indstate);
        }
        else
@@ -1264,6 +1270,8 @@ swap_relation_files(Oid r1, Oid r2, bool 
target_is_pg_class,
                CacheInvalidateRelcacheByTuple(reltup1);
                CacheInvalidateRelcacheByTuple(reltup2);
        }
+       UnlockTuple(relRelation, &otid1, InplaceUpdateTupleLock);
+       UnlockTuple(relRelation, &otid2, InplaceUpdateTupleLock);
 
        /*
         * Now that pg_class has been updated with its relevant information for
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 7870e93..5ba228d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14302,7 +14302,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool 
recursing, LOCKMODE lock
        /* Get its pg_class tuple, too */
        class_rel = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationOid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relationOid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relationOid);
        tuple_class = (Form_pg_class) GETSTRUCT(tuple);
@@ -14392,7 +14392,9 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool 
recursing, LOCKMODE lock
         * If the new owner is the same as the existing owner, consider the
         * command to have succeeded.  This is for dump restoration purposes.
         */
-       if (tuple_class->relowner != newOwnerId)
+       if (tuple_class->relowner == newOwnerId)
+               UnlockTuple(class_rel, &tuple->t_self, InplaceUpdateTupleLock);
+       else
        {
                Datum           repl_val[Natts_pg_class];
                bool            repl_null[Natts_pg_class];
@@ -14452,6 +14454,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool 
recursing, LOCKMODE lock
                newtuple = heap_modify_tuple(tuple, 
RelationGetDescr(class_rel), repl_val, repl_null, repl_repl);
 
                CatalogTupleUpdate(class_rel, &newtuple->t_self, newtuple);
+               UnlockTuple(class_rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 

Reply via email to