On Tue, Jun 18, 2024 at 08:23:49AM -0700, Noah Misch wrote:
> On Mon, Jun 17, 2024 at 06:57:30PM -0700, Andres Freund wrote:
> > On 2024-06-17 16:58:54 -0700, Noah Misch wrote:
> > > On Sat, Jun 15, 2024 at 03:37:18PM -0700, Noah Misch wrote:
> > > > On Wed, May 22, 2024 at 05:05:48PM -0700, Noah Misch wrote:
> > > > > https://postgr.es/m/20240512232923.aa.nmi...@google.com wrote:
> > > > > > Separable, nontrivial things not fixed in the attached patch stack:
> > > > > >
> > > > > > - Inplace update uses transactional CacheInvalidateHeapTuple().  
> > > > > > ROLLBACK of
> > > > > >   CREATE INDEX wrongly discards the inval, leading to the 
> > > > > > relhasindex=t loss
> > > > > >   still seen in inplace-inval.spec.  CacheInvalidateRelmap() does 
> > > > > > this right.
> > > > >
> > > > > I plan to fix that like CacheInvalidateRelmap(): send the inval 
> > > > > immediately,
> > > > > inside the critical section.  Send it in heap_xlog_inplace(), too.
> > 
> > I'm worried this might cause its own set of bugs, e.g. if there are any 
> > places
> > that, possibly accidentally, rely on the invalidation from the inplace 
> > update
> > to also cover separate changes.
> 
> Good point.  I do have index_update_stats() still doing an ideally-superfluous
> relcache update for that reason.  Taking that further, it would be cheap
> insurance to have the inplace update do a transactional inval in addition to
> its immediate inval.  Future master-only work could remove the transactional
> one.  How about that?

Restoring the transactional inval seemed good to me, so I've rebased and
included that.  This applies on top of three patches from
https://postgr.es/m/20240822073200.4f.nmi...@google.com.  I'm attaching those
to placate cfbot, but this thread is for review of the last patch only.
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Warn if LOCKTAG_TUPLE is held at commit, under debug_assertions.
    
    The current use always releases this locktag.  A planned use will
    continue that intent.  It will involve more areas of code, making unlock
    omissions easier.  Warn under debug_assertions, like we do for various
    resource leaks.  Back-patch to v12 (all supported versions), the plan
    for the commit of the new use.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20240512232923.aa.nmi...@google.com

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 0400a50..e5e7ab5 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -2256,6 +2256,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                                locallock->numLockOwners = 0;
                }
 
+#ifdef USE_ASSERT_CHECKING
+
+               /*
+                * Tuple locks are currently held only for short durations 
within a
+                * transaction. Check that we didn't forget to release one.
+                */
+               if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_TUPLE && !allLocks)
+                       elog(WARNING, "tuple lock held at commit");
+#endif
+
                /*
                 * If the lock or proclock pointers are NULL, this lock was 
taken via
                 * the relation fast-path (and is not known to have been 
transferred).
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Fix data loss at inplace update after heap_update().
    
    As previously-added tests demonstrated, heap_inplace_update() could
    instead update an unrelated tuple of the same catalog.  It could lose
    the update.  Losing relhasindex=t was a source of index corruption.
    Inplace-updating commands like VACUUM will now wait for heap_update()
    commands like GRANT TABLE and GRANT DATABASE.  That isn't ideal, but a
    long-running GRANT already hurts VACUUM progress more just by keeping an
    XID running.  The VACUUM will behave like a DELETE or UPDATE waiting for
    the uncommitted change.
    
    For implementation details, start at the systable_inplace_update_begin()
    header comment and README.tuplock.  Back-patch to v12 (all supported
    versions).  In back branches, retain a deprecated heap_inplace_update(),
    for extensions.
    
    Reviewed by Heikki Linnakangas and Alexander Lakhin.
    
    Discussion: 
https://postgr.es/m/CAMp+ueZQz3yDk7qg42hk6-9gxniYbp-=bg2mgqecerqr5gg...@mail.gmail.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index 6441e8b..ddb2def 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -153,3 +153,14 @@ The following infomask bits are applicable:
 
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
+
+Reading inplace-updated columns
+-------------------------------
+
+Inplace updates create an exception to the rule that tuple data won't change
+under a reader holding a pin.  A reader of a heap_fetch() result tuple may
+witness a torn read.  Current inplace-updated fields are aligned and are no
+wider than four bytes, and current readers don't need consistency across
+fields.  Hence, they get by with just fetching each field once.  XXX such a
+caller may also read a value that has not reached WAL; see
+systable_inplace_update_finish().
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b2014..24f7e62 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -63,7 +63,6 @@
 #include "storage/procarray.h"
 #include "storage/standby.h"
 #include "utils/datum.h"
-#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
@@ -6041,61 +6040,166 @@ heap_abort_speculative(Relation relation, ItemPointer 
tid)
 }
 
 /*
- * heap_inplace_update - update a tuple "in place" (ie, overwrite it)
+ * heap_inplace_lock - protect inplace update from concurrent heap_update()
  *
- * Overwriting violates both MVCC and transactional safety, so the uses
- * of this function in Postgres are extremely limited.  Nonetheless we
- * find some places to use it.
+ * Evaluate whether the tuple's state is compatible with a no-key update.
+ * Current transaction rowmarks are fine, as is KEY SHARE from any
+ * transaction.  If compatible, return true with the buffer exclusive-locked,
+ * and the caller must release that by calling heap_inplace_update(), calling
+ * heap_inplace_unlock(), or raising an error.  Otherwise, return false after
+ * blocking transactions, if any, have ended.
  *
- * The tuple cannot change size, and therefore it's reasonable to assume
- * that its null bitmap (if any) doesn't change either.  So we just
- * overwrite the data portion of the tuple without touching the null
- * bitmap or any of the header fields.
+ * Since this is intended for system catalogs and SERIALIZABLE doesn't cover
+ * DDL, this doesn't guarantee any particular predicate locking.
  *
- * tuple is an in-memory tuple structure containing the data to be written
- * over the target tuple.  Also, tuple->t_self identifies the target tuple.
+ * One could modify this to return true for tuples with delete in progress,
+ * All inplace updaters take a lock that conflicts with DROP.  If explicit
+ * "DELETE FROM pg_class" is in progress, we'll wait for it like we would an
+ * update.
  *
- * Note that the tuple updated here had better not come directly from the
- * syscache if the relation has a toast relation as this tuple could
- * include toast values that have been expanded, causing a failure here.
+ * Readers of inplace-updated fields expect changes to those fields are
+ * durable.  For example, vac_truncate_clog() reads datfrozenxid from
+ * pg_database tuples via catalog snapshots.  A future snapshot must not
+ * return a lower datfrozenxid for the same database OID (lower in the
+ * FullTransactionIdPrecedes() sense).  We achieve that since no update of a
+ * tuple can start while we hold a lock on its buffer.  In cases like
+ * BEGIN;GRANT;CREATE INDEX;COMMIT we're inplace-updating a tuple visible only
+ * to this transaction.  ROLLBACK then is one case where it's okay to lose
+ * inplace updates.  (Restoring relhasindex=false on ROLLBACK is fine, since
+ * any concurrent CREATE INDEX would have blocked, then inplace-updated the
+ * committed tuple.)
+ *
+ * In principle, we could avoid waiting by overwriting every tuple in the
+ * updated tuple chain.  Reader expectations permit updating a tuple only if
+ * it's aborted, is the tail of the chain, or we already updated the tuple
+ * referenced in its t_ctid.  Hence, we would need to overwrite the tuples in
+ * order from tail to head.  That would imply either (a) mutating all tuples
+ * in one critical section or (b) accepting a chance of partial completion.
+ * Partial completion of a relfrozenxid update would have the weird
+ * consequence that the table's next VACUUM could see the table's relfrozenxid
+ * move forward between vacuum_get_cutoffs() and finishing.
+ */
+bool
+heap_inplace_lock(Relation relation,
+                                 HeapTuple oldtup_ptr, Buffer buffer)
+{
+       HeapTupleData oldtup = *oldtup_ptr; /* minimize diff vs. heap_update() 
*/
+       TM_Result       result;
+       bool            ret;
+
+       Assert(BufferIsValid(buffer));
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*----------
+        * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
+        *
+        * - wait unconditionally
+        * - no tuple locks
+        * - don't recheck header after wait: simpler to defer to next iteration
+        * - don't try to continue even if the updater aborts: likewise
+        * - no crosscheck
+        */
+       result = HeapTupleSatisfiesUpdate(&oldtup, GetCurrentCommandId(false),
+                                                                         
buffer);
+
+       if (result == TM_Invisible)
+       {
+               /* no known way this can happen */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg_internal("attempted to overwrite 
invisible tuple")));
+       }
+       else if (result == TM_SelfModified)
+       {
+               /*
+                * CREATE INDEX might reach this if an expression is silly 
enough to
+                * call e.g. SELECT ... FROM pg_class FOR SHARE.  C code of 
other SQL
+                * statements might get here after a heap_update() of the same 
row, in
+                * the absence of an intervening CommandCounterIncrement().
+                */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("tuple to be updated was already 
modified by an operation triggered by the current command")));
+       }
+       else if (result == TM_BeingModified)
+       {
+               TransactionId xwait;
+               uint16          infomask;
+
+               xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+               infomask = oldtup.t_data->t_infomask;
+
+               if (infomask & HEAP_XMAX_IS_MULTI)
+               {
+                       LockTupleMode lockmode = LockTupleNoKeyExclusive;
+                       MultiXactStatus mxact_status = 
MultiXactStatusNoKeyUpdate;
+                       int                     remain;
+                       bool            current_is_member;
+
+                       if (DoesMultiXactIdConflict((MultiXactId) xwait, 
infomask,
+                                                                               
lockmode, &current_is_member))
+                       {
+                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                               ret = false;
+                               MultiXactIdWait((MultiXactId) xwait, 
mxact_status, infomask,
+                                                               relation, 
&oldtup.t_self, XLTW_Update,
+                                                               &remain);
+                       }
+                       else
+                               ret = true;
+               }
+               else if (TransactionIdIsCurrentTransactionId(xwait))
+                       ret = true;
+               else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                       ret = true;
+               else
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ret = false;
+                       XactLockTableWait(xwait, relation, &oldtup.t_self,
+                                                         XLTW_Update);
+               }
+       }
+       else
+       {
+               ret = (result == TM_Ok);
+               if (!ret)
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               }
+       }
+
+       /*
+        * GetCatalogSnapshot() relies on invalidation messages to know when to
+        * take a new snapshot.  COMMIT of xwait is responsible for sending the
+        * invalidation.  We're not acquiring heavyweight locks sufficient to
+        * block if not yet sent, so we must take a new snapshot to ensure a 
later
+        * attempt has a fair chance.  While we don't need this if xwait 
aborted,
+        * don't bother optimizing that.
+        */
+       if (!ret)
+               InvalidateCatalogSnapshot();
+       return ret;
+}
+
+/*
+ * heap_inplace_update - subroutine of systable_inplace_update_finish
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
  */
 void
-heap_inplace_update(Relation relation, HeapTuple tuple)
+heap_inplace_update(Relation relation,
+                                       HeapTuple oldtup, HeapTuple tuple,
+                                       Buffer buffer)
 {
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
+       HeapTupleHeader htup = oldtup->t_data;
        uint32          oldlen;
        uint32          newlen;
 
-       /*
-        * For now, we don't allow parallel updates.  Unlike a regular update,
-        * this should never create a combo CID, so it might be possible to 
relax
-        * this restriction, but not without more thought and testing.  It's not
-        * clear that it would be useful, anyway.
-        */
-       if (IsInParallelMode())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-                                errmsg("cannot update tuples during a parallel 
operation")));
-
-       INJECTION_POINT("inplace-before-pin");
-       buffer = ReadBuffer(relation, 
ItemPointerGetBlockNumber(&(tuple->t_self)));
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-       page = (Page) BufferGetPage(buffer);
-
-       offnum = ItemPointerGetOffsetNumber(&(tuple->t_self));
-       if (PageGetMaxOffsetNumber(page) >= offnum)
-               lp = PageGetItemId(page, offnum);
-
-       if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-               elog(ERROR, "invalid lp");
-
-       htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-       oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+       Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
+       oldlen = oldtup->t_len - htup->t_hoff;
        newlen = tuple->t_len - tuple->t_data->t_hoff;
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
@@ -6107,6 +6211,19 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                   (char *) tuple->t_data + tuple->t_data->t_hoff,
                   newlen);
 
+       /*----------
+        * XXX A crash here can allow datfrozenxid() to get ahead of 
relfrozenxid:
+        *
+        * ["D" is a VACUUM (ONLY_DATABASE_STATS)]
+        * ["R" is a VACUUM tbl]
+        * D: vac_update_datfrozenid() -> systable_beginscan(pg_class)
+        * D: systable_getnext() returns pg_class tuple of tbl
+        * R: memcpy() into pg_class tuple of tbl
+        * D: raise pg_database.datfrozenxid, XLogInsert(), finish
+        * [crash]
+        * [recovery restores datfrozenxid w/o relfrozenxid]
+        */
+
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
@@ -6127,23 +6244,35 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
 
-               PageSetLSN(page, recptr);
+               PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
        END_CRIT_SECTION();
 
-       UnlockReleaseBuffer(buffer);
+       heap_inplace_unlock(relation, oldtup, buffer);
 
        /*
         * Send out shared cache inval if necessary.  Note that because we only
         * pass the new version of the tuple, this mustn't be used for any
         * operations that could change catcache lookup keys.  But we aren't
         * bothering with index updates either, so that's true a fortiori.
+        *
+        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
         */
        if (!IsBootstrapProcessingMode())
                CacheInvalidateHeapTuple(relation, tuple, NULL);
 }
 
+/*
+ * heap_inplace_unlock - reverse of heap_inplace_lock
+ */
+void
+heap_inplace_unlock(Relation relation,
+                                       HeapTuple oldtup, Buffer buffer)
+{
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+}
+
 #define                FRM_NOOP                                0x0001
 #define                FRM_INVALIDATE_XMAX             0x0002
 #define                FRM_RETURN_IS_XID               0x0004
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 43c95d6..5f55e8c 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -20,6 +20,7 @@
 #include "postgres.h"
 
 #include "access/genam.h"
+#include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
+#include "utils/injection_point.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
@@ -747,3 +749,140 @@ systable_endscan_ordered(SysScanDesc sysscan)
                UnregisterSnapshot(sysscan->snapshot);
        pfree(sysscan);
 }
+
+/*
+ * systable_inplace_update_begin --- update a row "in place" (overwrite it)
+ *
+ * Overwriting violates both MVCC and transactional safety, so the uses of
+ * this function in Postgres are extremely limited.  Nonetheless we find some
+ * places to use it.  Standard flow:
+ *
+ * ... [any slow preparation not requiring oldtup] ...
+ * systable_inplace_update_begin([...], &tup, &inplace_state);
+ * if (!HeapTupleIsValid(tup))
+ *     elog(ERROR, [...]);
+ * ... [buffer is exclusive-locked; mutate "tup"] ...
+ * if (dirty)
+ *     systable_inplace_update_finish(inplace_state, tup);
+ * else
+ *     systable_inplace_update_cancel(inplace_state);
+ *
+ * The first several params duplicate the systable_beginscan() param list.
+ * "oldtupcopy" is an output parameter, assigned NULL if the key ceases to
+ * find a live tuple.  (In PROC_IN_VACUUM, that is a low-probability transient
+ * condition.)  If "oldtupcopy" gets non-NULL, you must pass output parameter
+ * "state" to systable_inplace_update_finish() or
+ * systable_inplace_update_cancel().
+ */
+void
+systable_inplace_update_begin(Relation relation,
+                                                         Oid indexId,
+                                                         bool indexOK,
+                                                         Snapshot snapshot,
+                                                         int nkeys, const 
ScanKeyData *key,
+                                                         HeapTuple *oldtupcopy,
+                                                         void **state)
+{
+       ScanKey         mutable_key = palloc(sizeof(ScanKeyData) * nkeys);
+       int                     retries = 0;
+       SysScanDesc scan;
+       HeapTuple       oldtup;
+
+       /*
+        * For now, we don't allow parallel updates.  Unlike a regular update,
+        * this should never create a combo CID, so it might be possible to 
relax
+        * this restriction, but not without more thought and testing.  It's not
+        * clear that it would be useful, anyway.
+        */
+       if (IsInParallelMode())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+                                errmsg("cannot update tuples during a parallel 
operation")));
+
+       /*
+        * Accept a snapshot argument, for symmetry, but this function advances
+        * its snapshot as needed to reach the tail of the updated tuple chain.
+        */
+       Assert(snapshot == NULL);
+
+       Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
+
+       /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       for (;;)
+       {
+               TupleTableSlot *slot;
+               BufferHeapTupleTableSlot *bslot;
+
+               CHECK_FOR_INTERRUPTS();
+
+               /*
+                * Processes issuing heap_update (e.g. GRANT) at maximum speed 
could
+                * drive us to this error.  A hostile table owner has stronger 
ways to
+                * damage their own table, so that's minor.
+                */
+               if (retries++ > 10000)
+                       elog(ERROR, "giving up after too many tries to 
overwrite row");
+
+               memcpy(mutable_key, key, sizeof(ScanKeyData) * nkeys);
+               INJECTION_POINT("inplace-before-pin");
+               scan = systable_beginscan(relation, indexId, indexOK, snapshot,
+                                                                 nkeys, 
mutable_key);
+               oldtup = systable_getnext(scan);
+               if (!HeapTupleIsValid(oldtup))
+               {
+                       systable_endscan(scan);
+                       *oldtupcopy = NULL;
+                       return;
+               }
+
+               slot = scan->slot;
+               Assert(TTS_IS_BUFFERTUPLE(slot));
+               bslot = (BufferHeapTupleTableSlot *) slot;
+               if (heap_inplace_lock(scan->heap_rel,
+                                                         bslot->base.tuple, 
bslot->buffer))
+                       break;
+               systable_endscan(scan);
+       };
+
+       *oldtupcopy = heap_copytuple(oldtup);
+       *state = scan;
+}
+
+/*
+ * systable_inplace_update_finish --- second phase of inplace update
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
+ */
+void
+systable_inplace_update_finish(void *state, HeapTuple tuple)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       Relation        relation = scan->heap_rel;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+
+       heap_inplace_update(relation, oldtup, tuple, buffer);
+       systable_endscan(scan);
+}
+
+/*
+ * systable_inplace_update_cancel --- abandon inplace update
+ *
+ * This is an alternative to making a no-op update.
+ */
+void
+systable_inplace_update_cancel(void *state)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       Relation        relation = scan->heap_rel;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+
+       heap_inplace_unlock(relation, oldtup, buffer);
+       systable_endscan(scan);
+}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 3375905..e4608b9 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2785,7 +2785,9 @@ index_update_stats(Relation rel,
 {
        Oid                     relid = RelationGetRelid(rel);
        Relation        pg_class;
+       ScanKeyData key[1];
        HeapTuple       tuple;
+       void       *state;
        Form_pg_class rd_rel;
        bool            dirty;
 
@@ -2819,33 +2821,12 @@ index_update_stats(Relation rel,
 
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       /*
-        * Make a copy of the tuple to update.  Normally we use the syscache, 
but
-        * we can't rely on that during bootstrap or while reindexing pg_class
-        * itself.
-        */
-       if (IsBootstrapProcessingMode() ||
-               ReindexIsProcessingHeap(RelationRelationId))
-       {
-               /* don't assume syscache will work */
-               TableScanDesc pg_class_scan;
-               ScanKeyData key[1];
-
-               ScanKeyInit(&key[0],
-                                       Anum_pg_class_oid,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(relid));
-
-               pg_class_scan = table_beginscan_catalog(pg_class, 1, key);
-               tuple = heap_getnext(pg_class_scan, ForwardScanDirection);
-               tuple = heap_copytuple(tuple);
-               table_endscan(pg_class_scan);
-       }
-       else
-       {
-               /* normal case, use syscache */
-               tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
-       }
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       systable_inplace_update_begin(pg_class, ClassOidIndexId, true, NULL,
+                                                                 1, key, 
&tuple, &state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u", relid);
@@ -2908,11 +2889,12 @@ index_update_stats(Relation rel,
         */
        if (dirty)
        {
-               heap_inplace_update(pg_class, tuple);
+               systable_inplace_update_finish(state, tuple);
                /* the above sends a cache inval message */
        }
        else
        {
+               systable_inplace_update_cancel(state);
                /* no need to change tuple, but force relcache inval anyway */
                CacheInvalidateRelcacheByTuple(tuple);
        }
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 738bc46..ad3082c 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -29,6 +29,7 @@
 #include "catalog/toasting.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -333,21 +334,36 @@ create_toast_table(Relation rel, Oid toastOid, Oid 
toastIndexOid,
         */
        class_rel = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
-       if (!HeapTupleIsValid(reltup))
-               elog(ERROR, "cache lookup failed for relation %u", relOid);
-
-       ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
-
        if (!IsBootstrapProcessingMode())
        {
                /* normal case, use a transactional update */
+               reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
                CatalogTupleUpdate(class_rel, &reltup->t_self, reltup);
        }
        else
        {
                /* While bootstrapping, we cannot UPDATE, so overwrite in-place 
*/
-               heap_inplace_update(class_rel, reltup);
+
+               ScanKeyData key[1];
+               void       *state;
+
+               ScanKeyInit(&key[0],
+                                       Anum_pg_class_oid,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(relOid));
+               systable_inplace_update_begin(class_rel, ClassOidIndexId, true,
+                                                                         NULL, 
1, key, &reltup, &state);
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
+               systable_inplace_update_finish(state, reltup);
        }
 
        heap_freetuple(reltup);
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index d00ae40..86a08d7 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1651,7 +1651,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        Relation        pgdbrel;
        HeapTuple       tup;
        ScanKeyData scankey;
-       SysScanDesc scan;
+       void       *inplace_state;
        Form_pg_database datform;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1790,24 +1790,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        pgstat_drop_database(db_id);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
-        */
-       ScanKeyInit(&scankey,
-                               Anum_pg_database_datname,
-                               BTEqualStrategyNumber, F_NAMEEQ,
-                               CStringGetDatum(dbname));
-
-       scan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
-                                                         NULL, 1, &scankey);
-
-       tup = systable_getnext(scan);
-       if (!HeapTupleIsValid(tup))
-               elog(ERROR, "cache lookup failed for database %u", db_id);
-       datform = (Form_pg_database) GETSTRUCT(tup);
-
-       /*
         * Except for the deletion of the catalog row, subsequent actions are 
not
         * transactional (consider DropDatabaseBuffers() discarding modified
         * buffers). But we might crash or get interrupted below. To prevent
@@ -1818,8 +1800,17 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * modification is durable before performing irreversible filesystem
         * operations.
         */
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(dbname));
+       systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
+                                                                 NULL, 1, 
&scankey, &tup, &inplace_state);
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
+       datform = (Form_pg_database) GETSTRUCT(tup);
        datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
-       heap_inplace_update(pgdbrel, tup);
+       systable_inplace_update_finish(inplace_state, tup);
        XLogFlush(XactLastRecEnd);
 
        /*
@@ -1827,8 +1818,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * the row will be gone, but if we fail, dropdb() can be invoked again.
         */
        CatalogTupleDelete(pgdbrel, &tup->t_self);
-
-       systable_endscan(scan);
+       heap_freetuple(tup);
 
        /*
         * Drop db-specific replication slots.
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 7a5ed6b..55baf10 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -946,25 +946,18 @@ EventTriggerOnLogin(void)
                {
                        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
                        HeapTuple       tuple;
+                       void       *state;
                        Form_pg_database db;
                        ScanKeyData key[1];
-                       SysScanDesc scan;
 
-                       /*
-                        * Get the pg_database tuple to scribble on.  Note that 
this does
-                        * not directly rely on the syscache to avoid issues 
with
-                        * flattened toast values for the in-place update.
-                        */
+                       /* Fetch a copy of the tuple to scribble on */
                        ScanKeyInit(&key[0],
                                                Anum_pg_database_oid,
                                                BTEqualStrategyNumber, F_OIDEQ,
                                                ObjectIdGetDatum(MyDatabaseId));
 
-                       scan = systable_beginscan(pg_db, DatabaseOidIndexId, 
true,
-                                                                         NULL, 
1, key);
-                       tuple = systable_getnext(scan);
-                       tuple = heap_copytuple(tuple);
-                       systable_endscan(scan);
+                       systable_inplace_update_begin(pg_db, 
DatabaseOidIndexId, true,
+                                                                               
  NULL, 1, key, &tuple, &state);
 
                        if (!HeapTupleIsValid(tuple))
                                elog(ERROR, "could not find tuple for database 
%u", MyDatabaseId);
@@ -980,13 +973,15 @@ EventTriggerOnLogin(void)
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
                                 *
-                                * It's known that changes made by 
heap_inplace_update() may
-                                * be lost due to concurrent normal updates.  
However, we are
-                                * OK with that.  The subsequent connections 
will still have a
-                                * chance to set "dathasloginevt" to false.
+                                * Changes made by inplace update may be lost 
due to
+                                * concurrent normal updates; see 
inplace-inval.spec. However,
+                                * we are OK with that.  The subsequent 
connections will still
+                                * have a chance to set "dathasloginevt" to 
false.
                                 */
-                               heap_inplace_update(pg_db, tuple);
+                               systable_inplace_update_finish(state, tuple);
                        }
+                       else
+                               systable_inplace_update_cancel(state);
                        table_close(pg_db, RowExclusiveLock);
                        heap_freetuple(tuple);
                }
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 7d8e9d2..9304b8c 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1402,7 +1402,9 @@ vac_update_relstats(Relation relation,
 {
        Oid                     relid = RelationGetRelid(relation);
        Relation        rd;
+       ScanKeyData key[1];
        HeapTuple       ctup;
+       void       *inplace_state;
        Form_pg_class pgcform;
        bool            dirty,
                                futurexid,
@@ -1413,7 +1415,12 @@ vac_update_relstats(Relation relation,
        rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
-       ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       systable_inplace_update_begin(rd, ClassOidIndexId, true,
+                                                                 NULL, 1, key, 
&ctup, &inplace_state);
        if (!HeapTupleIsValid(ctup))
                elog(ERROR, "pg_class entry for relid %u vanished during 
vacuuming",
                         relid);
@@ -1521,7 +1528,9 @@ vac_update_relstats(Relation relation,
 
        /* If anything changed, write out the tuple. */
        if (dirty)
-               heap_inplace_update(rd, ctup);
+               systable_inplace_update_finish(inplace_state, ctup);
+       else
+               systable_inplace_update_cancel(inplace_state);
 
        table_close(rd, RowExclusiveLock);
 
@@ -1573,6 +1582,7 @@ vac_update_datfrozenxid(void)
        bool            bogus = false;
        bool            dirty = false;
        ScanKeyData key[1];
+       void       *inplace_state;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1696,20 +1706,18 @@ vac_update_datfrozenxid(void)
        relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
+        * Fetch a copy of the tuple to scribble on.  We could check the 
syscache
+        * tuple first.  If that concluded !dirty, we'd avoid waiting on
+        * concurrent heap_update() and would avoid exclusive-locking the 
buffer.
+        * For now, don't optimize that.
         */
        ScanKeyInit(&key[0],
                                Anum_pg_database_oid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(MyDatabaseId));
 
-       scan = systable_beginscan(relation, DatabaseOidIndexId, true,
-                                                         NULL, 1, key);
-       tuple = systable_getnext(scan);
-       tuple = heap_copytuple(tuple);
-       systable_endscan(scan);
+       systable_inplace_update_begin(relation, DatabaseOidIndexId, true,
+                                                                 NULL, 1, key, 
&tuple, &inplace_state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for database %u", 
MyDatabaseId);
@@ -1743,7 +1751,9 @@ vac_update_datfrozenxid(void)
                newMinMulti = dbform->datminmxid;
 
        if (dirty)
-               heap_inplace_update(relation, tuple);
+               systable_inplace_update_finish(inplace_state, tuple);
+       else
+               systable_inplace_update_cancel(inplace_state);
 
        heap_freetuple(tuple);
        table_close(relation, RowExclusiveLock);
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index fdcfbe8..c25f5d1 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -233,5 +233,14 @@ extern SysScanDesc systable_beginscan_ordered(Relation 
heapRelation,
 extern HeapTuple systable_getnext_ordered(SysScanDesc sysscan,
                                                                                
  ScanDirection direction);
 extern void systable_endscan_ordered(SysScanDesc sysscan);
+extern void systable_inplace_update_begin(Relation relation,
+                                                                               
  Oid indexId,
+                                                                               
  bool indexOK,
+                                                                               
  Snapshot snapshot,
+                                                                               
  int nkeys, const ScanKeyData *key,
+                                                                               
  HeapTuple *oldtupcopy,
+                                                                               
  void **state);
+extern void systable_inplace_update_finish(void *state, HeapTuple tuple);
+extern void systable_inplace_update_cancel(void *state);
 
 #endif                                                 /* GENAM_H */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec8..85ad32a 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -336,7 +336,13 @@ extern TM_Result heap_lock_tuple(Relation relation, 
HeapTuple tuple,
                                                                 bool 
follow_updates,
                                                                 Buffer 
*buffer, struct TM_FailureData *tmfd);
 
-extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern bool heap_inplace_lock(Relation relation,
+                                                         HeapTuple oldtup_ptr, 
Buffer buffer);
+extern void heap_inplace_update(Relation relation,
+                                                               HeapTuple 
oldtup, HeapTuple tuple,
+                                                               Buffer buffer);
+extern void heap_inplace_unlock(Relation relation,
+                                                               HeapTuple 
oldtup, Buffer buffer);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
                                                                          const 
struct VacuumCutoffs *cutoffs,
                                                                          
HeapPageFreeze *pagefrz,
diff --git a/src/test/isolation/expected/intra-grant-inplace-db.out 
b/src/test/isolation/expected/intra-grant-inplace-db.out
index 432ece5..a91402c 100644
--- a/src/test/isolation/expected/intra-grant-inplace-db.out
+++ b/src/test/isolation/expected/intra-grant-inplace-db.out
@@ -9,20 +9,20 @@ step b1: BEGIN;
 step grant1: 
        GRANT TEMP ON DATABASE isolation_regression TO regress_temp_grantee;
 
-step vac2: VACUUM (FREEZE);
+step vac2: VACUUM (FREEZE); <waiting ...>
 step snap3: 
        INSERT INTO frozen_witness
        SELECT datfrozenxid FROM pg_database WHERE datname = current_catalog;
 
 step c1: COMMIT;
+step vac2: <... completed>
 step cmp3: 
        SELECT 'datfrozenxid retreated'
        FROM pg_database
        WHERE datname = current_catalog
                AND age(datfrozenxid) > (SELECT min(age(x)) FROM 
frozen_witness);
 
-?column?              
-----------------------
-datfrozenxid retreated
-(1 row)
+?column?
+--------
+(0 rows)
 
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index cc1e47a..fe26984 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -14,15 +14,16 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
@@ -58,8 +59,9 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
+step addk2: <... completed>
 
 starting permutation: b2 sfnku2 addk2 c2
 step b2: BEGIN;
@@ -98,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
        SELECT relhasindex FROM pg_class
@@ -122,17 +124,19 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
diff --git a/src/test/isolation/specs/intra-grant-inplace-db.spec 
b/src/test/isolation/specs/intra-grant-inplace-db.spec
index bbecd5d..9de40ec 100644
--- a/src/test/isolation/specs/intra-grant-inplace-db.spec
+++ b/src/test/isolation/specs/intra-grant-inplace-db.spec
@@ -42,5 +42,4 @@ step cmp3     {
 }
 
 
-# XXX extant bug
 permutation snap3 b1 grant1 vac2(c1) snap3 c1 cmp3
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index 3cd696b..d07ed3b 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -48,6 +48,7 @@ step sfu3     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
+step as3       { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3        { ROLLBACK; }
 
 # Additional heap_update()
@@ -73,7 +74,7 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned post-bugfix behavior
+# XXX extant bugs: permutation comments refer to planned future LockTuple()
 
 permutation
        b1
@@ -117,6 +118,7 @@ permutation
        b1
        grant1(r3)      # acquire LockTuple(), await sfu3 xmax
        read2
+       as3                     # XXX temporary until patch adds locking to 
addk2
        addk2(c1)       # block in LockTuple() behind grant1
        r3                      # unblock grant1; addk2 now awaits grant1 xmax
        c1
diff --git a/src/test/modules/injection_points/expected/inplace.out 
b/src/test/modules/injection_points/expected/inplace.out
index 123f45a..db7dab6 100644
--- a/src/test/modules/injection_points/expected/inplace.out
+++ b/src/test/modules/injection_points/expected/inplace.out
@@ -40,4 +40,301 @@ step read1:
        SELECT reltuples = -1 AS reltuples_unknown
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 
-ERROR:  could not create unique index "pg_class_oid_index"
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 grant2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: vac1 begin2 grant2 revoke2 mkrels3 c2 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step c2: COMMIT;
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 grant2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/inplace.spec 
b/src/test/modules/injection_points/specs/inplace.spec
index e957713..86539a5 100644
--- a/src/test/modules/injection_points/specs/inplace.spec
+++ b/src/test/modules/injection_points/specs/inplace.spec
@@ -32,12 +32,9 @@ setup
        CREATE TABLE vactest.orig50 ();
        SELECT vactest.mkrels('orig', 51, 100);
 }
-
-# XXX DROP causes an assertion failure; adopt DROP once fixed
 teardown
 {
-       --DROP SCHEMA vactest CASCADE;
-       DO $$BEGIN EXECUTE 'ALTER SCHEMA vactest RENAME TO schema' || oid FROM 
pg_namespace where nspname = 'vactest'; END$$;
+       DROP SCHEMA vactest CASCADE;
        DROP EXTENSION injection_points;
 }
 
@@ -56,11 +53,13 @@ step read1  {
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 }
 
-
 # Transactional updates of the tuple vac1 is waiting to inplace-update.
 session s2
 step grant2            { GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC; }
-
+step revoke2   { REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC; }
+step begin2            { BEGIN; }
+step c2                        { COMMIT; }
+step r2                        { ROLLBACK; }
 
 # Non-blocking actions.
 session s3
@@ -74,10 +73,69 @@ step mkrels3        {
 }
 
 
-# XXX extant bug
+# target gains a successor at the last moment
 permutation
        vac1(mkrels3)   # reads pg_class tuple T0 for vactest.orig50, xmax 
invalid
        grant2                  # T0 becomes eligible for pruning, T1 is 
successor
        vac3                    # T0 becomes LP_UNUSED
-       mkrels3                 # T0 reused; vac1 wakes and overwrites the 
reused T0
+       mkrels3                 # vac1 wakes, scans to T1
        read1
+
+# target already has a successor, which commits
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       c2                              # T0 becomes eligible for pruning
+       vac3                    # T0 becomes LP_UNUSED
+       mkrels3                 # vac1 wakes, scans to T1
+       read1
+
+# target already has a successor, which becomes LP_UNUSED at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       r2                              # T1 becomes eligible for pruning
+       vac3                    # T1 becomes LP_UNUSED
+       mkrels3                 # reuse T1; vac1 scans to T0
+       read1
+
+# target already has a successor, which becomes LP_REDIRECT at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       c2
+       revoke2                 # HOT update to T2
+       grant2                  # HOT update to T3
+       vac3                    # T1 becomes LP_REDIRECT
+       mkrels3                 # reuse T2; vac1 scans to T3
+       read1
+
+# waiting for updater to end
+permutation
+       vac1(c2)                # reads pg_class tuple T0 for vactest.orig50, 
xmax invalid
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       revoke2                 # HOT update to T2
+       mkrels3                 # vac1 awakes briefly, then waits for s2
+       c2
+       read1
+
+# Another LP_UNUSED.  This time, do change the live tuple.  Final live tuple
+# body is identical to original, at a different TID.
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       r2                              # T1 becomes eligible for pruning
+       grant2                  # T0.t_ctid = T2; T0 becomes eligible for 
pruning
+       revoke2                 # T2.t_ctid = T3; T2 becomes eligible for 
pruning
+       vac3                    # T0, T1 & T2 become LP_UNUSED
+       mkrels3                 # reuse T0, T1 & T2; vac1 scans to T3
+       read1
+
+# Another LP_REDIRECT.  Compared to the earlier test, omit the last grant2.
+# Hence, final live tuple body is identical to original, at a different TID.
+permutation begin2 grant2 vac1(mkrels3) c2 revoke2 vac3 mkrels3 read1
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Make heap_update() callers wait for inplace update.
    
    The previous commit fixed some ways of losing an inplace update.  It
    remained possible to lose one when a backend working toward a
    heap_update() copied a tuple into memory just before inplace update of
    that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
    to govern admission to the steps of copying an old tuple, modifying it,
    and issuing heap_update().  This includes UPDATE and MERGE commands.  To
    avoid changing most of the pg_class DDL, don't require LOCKTAG_TUPLE
    when holding a relation lock sufficient to exclude inplace updaters.
    Back-patch to v12 (all supported versions).
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20231027214946.79.nmi...@google.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index ddb2def..95828ce 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -154,6 +154,48 @@ The following infomask bits are applicable:
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
 
+Locking to write inplace-updated tables
+---------------------------------------
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives heap_inplace_update_scan() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class heap_inplace_update_scan() callers: before the call, acquire a lock
+  on the relation in mode ShareUpdateExclusiveLock or stricter.  If the update
+  targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX), that
+  lock must be on the table.  Locking the index rel is not necessary.  (This
+  allows VACUUM to overwrite per-index pg_class while holding a lock on the
+  table alone.) heap_inplace_update_scan() acquires and releases LOCKTAG_TUPLE
+  in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each tuple it
+  overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
+  ShareRowExclusiveLock or stricter on the relation.
+
+  SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
+  Most heap_update() callers already hold a suitable lock on the relation for
+  other reasons and can skip the tuple lock.  If you do acquire the tuple
+  lock, release it immediately after the update.
+
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
 Reading inplace-updated columns
 -------------------------------
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 24f7e62..7de60c1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,8 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -75,6 +77,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  Buffer 
newbuf, HeapTuple oldtup,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                               
                 ItemPointer otid,
+                                                                               
                 HeapTuple newtup);
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                                                                
   Bitmapset *interesting_cols,
                                                                                
   Bitmapset *external_cols,
@@ -121,6 +129,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, 
HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3207,6 +3217,10 @@ heap_update(Relation relation, ItemPointer otid, 
HeapTuple newtup,
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
+#ifdef USE_ASSERT_CHECKING
+       check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
        /*
         * Fetch the list of attributes to be checked for various operations.
         *
@@ -4071,6 +4085,128 @@ l2:
        return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                        
ItemPointer otid,
+                                                                        
HeapTuple newtup)
+{
+       /* LOCKTAG_TUPLE acceptable for any catalog */
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+               case DatabaseRelationId:
+                       {
+                               LOCKTAG         tuptag;
+
+                               SET_LOCKTAG_TUPLE(tuptag,
+                                                                 
relation->rd_lockInfo.lockRelId.dbId,
+                                                                 
relation->rd_lockInfo.lockRelId.relId,
+                                                                 
ItemPointerGetBlockNumber(otid),
+                                                                 
ItemPointerGetOffsetNumber(otid));
+                               if (LockHeldByMe(&tuptag, 
InplaceUpdateTupleLock, false))
+                                       return;
+                       }
+                       break;
+               default:
+                       Assert(!IsInplaceUpdateRelation(relation));
+                       return;
+       }
+
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+                       {
+                               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+                               Form_pg_class classForm = (Form_pg_class) 
GETSTRUCT(newtup);
+                               Oid                     relid = classForm->oid;
+                               Oid                     dbid;
+                               LOCKTAG         tag;
+
+                               if (IsSharedRelation(relid))
+                                       dbid = InvalidOid;
+                               else
+                                       dbid = MyDatabaseId;
+
+                               if (classForm->relkind == RELKIND_INDEX)
+                               {
+                                       Relation        irel = 
index_open(relid, AccessShareLock);
+
+                                       SET_LOCKTAG_RELATION(tag, dbid, 
irel->rd_index->indrelid);
+                                       index_close(irel, AccessShareLock);
+                               }
+                               else
+                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+                               if (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
+                                       !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true))
+                                       elog(WARNING,
+                                                "missing lock for relation 
\"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                                                NameStr(classForm->relname),
+                                                relid,
+                                                classForm->relkind,
+                                                
ItemPointerGetBlockNumber(otid),
+                                                
ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+               case DatabaseRelationId:
+                       {
+                               /* LOCKTAG_TUPLE required */
+                               Form_pg_database dbForm = (Form_pg_database) 
GETSTRUCT(newtup);
+
+                               elog(WARNING,
+                                        "missing lock on database \"%s\" (OID 
%u) @ TID (%u,%u)",
+                                        NameStr(dbForm->datname),
+                                        dbForm->oid,
+                                        ItemPointerGetBlockNumber(otid),
+                                        ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+       }
+}
+
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+       Oid                     relid = classForm->oid;
+       Oid                     dbid;
+       LOCKTAG         tag;
+
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
+
+       if (classForm->relkind == RELKIND_INDEX)
+       {
+               Relation        irel = index_open(relid, AccessShareLock);
+
+               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+               index_close(irel, AccessShareLock);
+       }
+       else
+               SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+       if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+               elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) 
@ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(&oldtup->t_self),
+                        ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6087,15 +6223,21 @@ heap_inplace_lock(Relation relation,
        TM_Result       result;
        bool            ret;
 
+#ifdef USE_ASSERT_CHECKING
+       if (RelationGetRelid(relation) == RelationRelationId)
+               check_inplace_rel_lock(oldtup_ptr);
+#endif
+
        Assert(BufferIsValid(buffer));
 
+       LockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
        /*----------
         * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
         *
         * - wait unconditionally
-        * - no tuple locks
+        * - already locked tuple above, since inplace needs that 
unconditionally
         * - don't recheck header after wait: simpler to defer to next iteration
         * - don't try to continue even if the updater aborts: likewise
         * - no crosscheck
@@ -6179,7 +6321,10 @@ heap_inplace_lock(Relation relation,
         * don't bother optimizing that.
         */
        if (!ret)
+       {
+               UnlockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
                InvalidateCatalogSnapshot();
+       }
        return ret;
 }
 
@@ -6188,6 +6333,8 @@ heap_inplace_lock(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update(Relation relation,
@@ -6271,6 +6418,7 @@ heap_inplace_unlock(Relation relation,
                                        HeapTuple oldtup, Buffer buffer)
 {
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
 }
 
 #define                FRM_NOOP                                0x0001
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 5f55e8c..a3f2dc2 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -755,7 +755,9 @@ systable_endscan_ordered(SysScanDesc sysscan)
  *
  * Overwriting violates both MVCC and transactional safety, so the uses of
  * this function in Postgres are extremely limited.  Nonetheless we find some
- * places to use it.  Standard flow:
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
  * ... [any slow preparation not requiring oldtup] ...
  * systable_inplace_update_begin([...], &tup, &inplace_state);
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index a44ccee..bc0e259 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                HeapTuple       tuple;
                ListCell   *cell_colprivs;
 
-               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+               tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", 
relOid);
                pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                                                                
 values, nulls, replaces);
 
                        CatalogTupleUpdate(relation, &newtuple->t_self, 
newtuple);
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                        /* Update initial privileges for extensions */
                        recordExtensionInitPriv(relOid, RelationRelationId, 0, 
new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
                        pfree(new_acl);
                }
+               else
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                /*
                 * Handle column-level privileges, if any were specified or 
implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                Oid                *oldmembers;
                Oid                *newmembers;
 
-               tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+               tuple = SearchSysCacheLocked1(cacheid, 
ObjectIdGetDatum(objectid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for %s %u", 
get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                                                                         nulls, 
replaces);
 
                CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+               UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /* Update initial privileges for extensions */
                recordExtensionInitPriv(objectid, classid, 0, new_acl);
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 6c39434..8aefbcd 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *             True iff core code performs inplace updates on the relation.
+ *
+ *             This is used for assertions and for making the executor follow 
the
+ *             locking protocol described at README.tuplock section "Locking 
to write
+ *             inplace-updated tables".  Extensions may inplace-update other 
heap
+ *             tables, but concurrent SQL UPDATE on the same table may 
overwrite
+ *             those modifications.
+ *
+ *             The executor can assume these are not partitions or partitioned 
and
+ *             have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index 86a08d7..2987ce9 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1877,6 +1877,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
        Oid                     db_id;
        HeapTuple       newtup;
+       ItemPointerData otid;
        Relation        rel;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1948,11 +1949,13 @@ RenameDatabase(const char *oldname, const char *newname)
                                 errdetail_busy_db(notherbackends, 
npreparedxacts)));
 
        /* rename */
-       newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       newtup = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(db_id));
        if (!HeapTupleIsValid(newtup))
                elog(ERROR, "cache lookup failed for database %u", db_id);
+       otid = newtup->t_self;
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+       CatalogTupleUpdate(rel, &otid, newtup);
+       UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2201,6 +2204,7 @@ movedb(const char *dbname, const char *tblspcname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not 
exist", dbname)));
+               LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                new_record[Anum_pg_database_dattablespace - 1] = 
ObjectIdGetDatum(dst_tblspcoid);
                new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2209,6 +2213,7 @@ movedb(const char *dbname, const char *tblspcname)
                                                                         
new_record,
                                                                         
new_record_nulls, new_record_repl);
                CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+               UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2439,6 +2444,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", 
stmt->dbname)));
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
@@ -2488,6 +2494,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                                                 
new_record_nulls, new_record_repl);
        CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2537,6 +2544,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
        if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datum = heap_getattr(tuple, Anum_pg_database_datcollversion, 
RelationGetDescr(rel), &isnull);
        oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2565,6 +2573,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                bool            nulls[Natts_pg_database] = {0};
                bool            replaces[Natts_pg_database] = {0};
                Datum           values[Natts_pg_database] = {0};
+               HeapTuple       newtuple;
 
                ereport(NOTICE,
                                (errmsg("changing version from %s to %s",
@@ -2573,14 +2582,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                values[Anum_pg_database_datcollversion - 1] = 
CStringGetTextDatum(newversion);
                replaces[Anum_pg_database_datcollversion - 1] = true;
 
-               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                                                 values, 
nulls, replaces);
-               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-               heap_freetuple(tuple);
+               newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                        
values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+               heap_freetuple(newtuple);
        }
        else
                ereport(NOTICE,
                                (errmsg("version has not changed")));
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2692,6 +2702,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                                        
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to change 
owner of database")));
 
+               LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
                repl_repl[Anum_pg_database_datdba - 1] = true;
                repl_val[Anum_pg_database_datdba - 1] = 
ObjectIdGetDatum(newOwnerId);
 
@@ -2713,6 +2725,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
                newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), 
repl_val, repl_null, repl_repl);
                CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+               UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 55baf10..05a6de6 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
        /* Set dathasloginevt flag in pg_database */
        Form_pg_database db;
        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
+       ItemPointerData otid;
        HeapTuple       tuple;
 
        /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
         */
        LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, 
AccessExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
+       tuple = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for database %u", 
MyDatabaseId);
+       otid = tuple->t_self;
        db = (Form_pg_database) GETSTRUCT(tuple);
        if (!db->dathasloginevt)
        {
                db->dathasloginevt = true;
-               CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_db, &otid, tuple);
                CommandCounterIncrement();
        }
+       UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
        table_close(pg_db, RowExclusiveLock);
        heap_freetuple(tuple);
 }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index c5a56c7..6b22a88 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -4413,14 +4413,17 @@ update_relispartition(Oid relationId, bool newval)
 {
        HeapTuple       tup;
        Relation        classRel;
+       ItemPointerData otid;
 
        classRel = table_open(RelationRelationId, RowExclusiveLock);
-       tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+       tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relationId);
+       otid = tup->t_self;
        Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
        ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-       CatalogTupleUpdate(classRel, &tup->t_self, tup);
+       CatalogTupleUpdate(classRel, &otid, tup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tup);
        table_close(classRel, RowExclusiveLock);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 52ce6b0..03278f6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3590,6 +3590,7 @@ SetRelationTableSpace(Relation rel,
 {
        Relation        pg_class;
        HeapTuple       tuple;
+       ItemPointerData otid;
        Form_pg_class rd_rel;
        Oid                     reloid = RelationGetRelid(rel);
 
@@ -3598,9 +3599,10 @@ SetRelationTableSpace(Relation rel,
        /* Get a modifiable copy of the relation's pg_class row. */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+       tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", reloid);
+       otid = tuple->t_self;
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
        /* Update the pg_class row. */
@@ -3608,7 +3610,8 @@ SetRelationTableSpace(Relation rel,
                InvalidOid : newTableSpaceId;
        if (RelFileNumberIsValid(newRelFilenumber))
                rd_rel->relfilenode = newRelFilenumber;
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
        /*
         * Record dependency on tablespace.  This is only required for relations
@@ -4102,6 +4105,7 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
 {
        Relation        targetrelation;
        Relation        relrelation;    /* for RELATION relation */
+       ItemPointerData otid;
        HeapTuple       reltup;
        Form_pg_class relform;
        Oid                     namespaceId;
@@ -4124,7 +4128,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       otid = reltup->t_self;
        if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
        relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4151,7 +4156,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        namestrcpy(&(relform->relname), newrelname);
 
-       CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+       CatalogTupleUpdate(relrelation, &otid, reltup);
+       UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                                                 InvalidOid, 
is_internal);
@@ -14926,7 +14932,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -15030,6 +15036,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                                                                 repl_val, 
repl_null, repl_repl);
 
        CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+       UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17179,7 +17186,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        ObjectAddress thisobj;
        bool            already_done = false;
 
-       classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+       /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+       classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(classTup))
                elog(ERROR, "cache lookup failed for relation %u", relOid);
        classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17198,6 +17206,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        already_done = object_address_present(&thisobj, objsMoved);
        if (!already_done && oldNspOid != newNspOid)
        {
+               ItemPointerData otid = classTup->t_self;
+
                /* check for duplicate name (more friendly than unique-index 
failure) */
                if (get_relname_relid(NameStr(classForm->relname),
                                                          newNspOid) != 
InvalidOid)
@@ -17210,7 +17220,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                /* classTup is a copy, so OK to scribble on */
                classForm->relnamespace = newNspOid;
 
-               CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+               CatalogTupleUpdate(classRel, &otid, classTup);
+               UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
                /* Update dependency on schema if caller said so */
                if (hasDependEntry &&
@@ -17222,6 +17234,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                        elog(ERROR, "could not change schema dependency for 
relation \"%s\"",
                                 NameStr(classForm->relname));
        }
+       else
+               UnlockTuple(classRel, &classTup->t_self, 
InplaceUpdateTupleLock);
        if (!already_done)
        {
                add_exact_object_address(&thisobj, objsMoved);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 29e186f..f880f90 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1204,6 +1204,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_NumIndices = 0;
        resultRelInfo->ri_IndexRelationDescs = NULL;
        resultRelInfo->ri_IndexRelationInfo = NULL;
+       resultRelInfo->ri_needLockTagTuple =
+               IsInplaceUpdateRelation(resultRelationDesc);
        /* make a copy so as not to depend on relcache info not changing... */
        resultRelInfo->ri_TrigDesc = 
CopyTriggerDesc(resultRelationDesc->trigdesc);
        if (resultRelInfo->ri_TrigDesc)
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index 1086cbc..54025c9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -661,8 +661,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        Relation        rel = resultRelInfo->ri_RelationDesc;
        ItemPointer tid = &(searchslot->tts_tid);
 
-       /* For now we support only tables. */
+       /*
+        * We support only non-system tables, with
+        * check_publication_add_relation() accountable.
+        */
        Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+       Assert(!IsCatalogRelation(rel));
 
        CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
diff --git a/src/backend/executor/nodeModifyTable.c 
b/src/backend/executor/nodeModifyTable.c
index 8bf4c80..1161520 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2324,6 +2324,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
        }
        else
        {
+               ItemPointerData lockedtid;
+
                /*
                 * If we generate a new candidate tuple after EvalPlanQual 
testing, we
                 * must loop back here to try again.  (We don't need to redo 
triggers,
@@ -2332,6 +2334,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
                 * to do them again.)
                 */
 redo_act:
+               lockedtid = *tupleid;
                result = ExecUpdateAct(context, resultRelInfo, tupleid, 
oldtuple, slot,
                                                           canSetTag, 
&updateCxt);
 
@@ -2425,6 +2428,14 @@ redo_act:
                                                                
ExecInitUpdateProjection(context->mtstate,
                                                                                
                                 resultRelInfo);
 
+                                                       if 
(resultRelInfo->ri_needLockTagTuple)
+                                                       {
+                                                               
UnlockTuple(resultRelationDesc,
+                                                                               
        &lockedtid, InplaceUpdateTupleLock);
+                                                               
LockTuple(resultRelationDesc,
+                                                                               
  tupleid, InplaceUpdateTupleLock);
+                                                       }
+
                                                        /* Fetch the most 
recent version of old tuple. */
                                                        oldSlot = 
resultRelInfo->ri_oldTupleSlot;
                                                        if 
(!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2529,6 +2540,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
        TransactionId xmin;
        bool            isnull;
 
+       /*
+        * Parse analysis should have blocked ON CONFLICT for all system
+        * relations, which includes these.  There's no fundamental obstacle to
+        * supporting this; we'd just need to handle LOCKTAG_TUPLE like the 
other
+        * ExecUpdate() caller.
+        */
+       Assert(!resultRelInfo->ri_needLockTagTuple);
+
        /* Determine lock mode to use */
        lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2854,6 +2873,7 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
 {
        ModifyTableState *mtstate = context->mtstate;
        List      **mergeActions = resultRelInfo->ri_MergeActions;
+       ItemPointerData lockedtid;
        List       *actionStates;
        TupleTableSlot *newslot = NULL;
        TupleTableSlot *rslot = NULL;
@@ -2890,14 +2910,32 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
         * target wholerow junk attr.
         */
        Assert(tupleid != NULL || oldtuple != NULL);
+       ItemPointerSetInvalid(&lockedtid);
        if (oldtuple != NULL)
+       {
+               Assert(!resultRelInfo->ri_needLockTagTuple);
                ExecForceStoreHeapTuple(oldtuple, 
resultRelInfo->ri_oldTupleSlot,
                                                                false);
-       else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                                                               
        tupleid,
-                                                                               
        SnapshotAny,
-                                                                               
        resultRelInfo->ri_oldTupleSlot))
-               elog(ERROR, "failed to fetch the target tuple");
+       }
+       else
+       {
+               if (resultRelInfo->ri_needLockTagTuple)
+               {
+                       /*
+                        * This locks even for CMD_DELETE, for CMD_NOTHING, and 
for tuples
+                        * that don't match mas_whenqual.  MERGE on system 
catalogs is a
+                        * minor use case, so don't bother optimizing those.
+                        */
+                       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                         InplaceUpdateTupleLock);
+                       lockedtid = *tupleid;
+               }
+               if 
(!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                                                               
   tupleid,
+                                                                               
   SnapshotAny,
+                                                                               
   resultRelInfo->ri_oldTupleSlot))
+                       elog(ERROR, "failed to fetch the target tuple");
+       }
 
        /*
         * Test the join condition.  If it's satisfied, perform a MATCHED 
action.
@@ -2969,7 +3007,7 @@ lmerge_matched:
                                                                                
tupleid, NULL, newslot, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -2980,11 +3018,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRUpdateTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple, newslot))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
-                                       /* called 
table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecUpdateAct(context, 
resultRelInfo, tupleid,
@@ -3003,7 +3041,8 @@ lmerge_matched:
                                        if (updateCxt.crossPartUpdate)
                                        {
                                                mtstate->mt_merge_updated += 1;
-                                               return 
context->cpUpdateReturningSlot;
+                                               rslot = 
context->cpUpdateReturningSlot;
+                                               goto out;
                                        }
                                }
 
@@ -3021,7 +3060,7 @@ lmerge_matched:
                                                                                
NULL, NULL, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -3032,11 +3071,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRDeleteTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
-                                       /* called 
table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecDeleteAct(context, 
resultRelInfo, tupleid,
@@ -3118,7 +3157,7 @@ lmerge_matched:
                                 * let caller handle it under NOT MATCHED [BY 
TARGET] clauses.
                                 */
                                *matched = false;
-                               return NULL;
+                               goto out;
 
                        case TM_Updated:
                                {
@@ -3192,7 +3231,7 @@ lmerge_matched:
                                                                 * more to do.
                                                                 */
                                                                if 
(TupIsNull(epqslot))
-                                                                       return 
NULL;
+                                                                       goto 
out;
 
                                                                /*
                                                                 * If we got a 
NULL ctid from the subplan, the
@@ -3210,6 +3249,15 @@ lmerge_matched:
                                                                 * we need to 
switch to the NOT MATCHED BY
                                                                 * SOURCE case.
                                                                 */
+                                                               if 
(resultRelInfo->ri_needLockTagTuple)
+                                                               {
+                                                                       if 
(ItemPointerIsValid(&lockedtid))
+                                                                               
UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                                               
                        InplaceUpdateTupleLock);
+                                                                       
LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                                                               
          InplaceUpdateTupleLock);
+                                                                       
lockedtid = context->tmfd.ctid;
+                                                               }
                                                                if 
(!table_tuple_fetch_row_version(resultRelationDesc,
                                                                                
                                                   &context->tmfd.ctid,
                                                                                
                                                   SnapshotAny,
@@ -3238,7 +3286,7 @@ lmerge_matched:
                                                         * MATCHED [BY TARGET] 
actions
                                                         */
                                                        *matched = false;
-                                                       return NULL;
+                                                       goto out;
 
                                                case TM_SelfModified:
 
@@ -3266,13 +3314,13 @@ lmerge_matched:
 
                                                        /* This shouldn't 
happen */
                                                        elog(ERROR, "attempted 
to update or delete invisible tuple");
-                                                       return NULL;
+                                                       goto out;
 
                                                default:
                                                        /* see table_tuple_lock 
call in ExecDelete() */
                                                        elog(ERROR, "unexpected 
table_tuple_lock status: %u",
                                                                 result);
-                                                       return NULL;
+                                                       goto out;
                                        }
                                }
 
@@ -3319,6 +3367,10 @@ lmerge_matched:
        /*
         * Successfully executed an action or no qualifying action was found.
         */
+out:
+       if (ItemPointerIsValid(&lockedtid))
+               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                       InplaceUpdateTupleLock);
        return rslot;
 }
 
@@ -3770,6 +3822,7 @@ ExecModifyTable(PlanState *pstate)
        HeapTupleData oldtupdata;
        HeapTuple       oldtuple;
        ItemPointer tupleid;
+       bool            tuplock;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -4082,6 +4135,8 @@ ExecModifyTable(PlanState *pstate)
                                break;
 
                        case CMD_UPDATE:
+                               tuplock = false;
+
                                /* Initialize projection info if first time for 
this table */
                                if 
(unlikely(!resultRelInfo->ri_projectNewInfoValid))
                                        ExecInitUpdateProjection(node, 
resultRelInfo);
@@ -4093,6 +4148,7 @@ ExecModifyTable(PlanState *pstate)
                                oldSlot = resultRelInfo->ri_oldTupleSlot;
                                if (oldtuple != NULL)
                                {
+                                       
Assert(!resultRelInfo->ri_needLockTagTuple);
                                        /* Use the wholerow junk attr as the 
old tuple. */
                                        ExecForceStoreHeapTuple(oldtuple, 
oldSlot, false);
                                }
@@ -4101,6 +4157,11 @@ ExecModifyTable(PlanState *pstate)
                                        /* Fetch the most recent version of old 
tuple. */
                                        Relation        relation = 
resultRelInfo->ri_RelationDesc;
 
+                                       if (resultRelInfo->ri_needLockTagTuple)
+                                       {
+                                               LockTuple(relation, tupleid, 
InplaceUpdateTupleLock);
+                                               tuplock = true;
+                                       }
                                        if 
(!table_tuple_fetch_row_version(relation, tupleid,
                                                                                
                           SnapshotAny,
                                                                                
                           oldSlot))
@@ -4112,6 +4173,9 @@ ExecModifyTable(PlanState *pstate)
                                /* Now apply the update. */
                                slot = ExecUpdate(&context, resultRelInfo, 
tupleid, oldtuple,
                                                                  slot, 
node->canSetTag);
+                               if (tuplock)
+                                       
UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                                               
InplaceUpdateTupleLock);
                                break;
 
                        case CMD_DELETE:
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index 66ed24e..5abb97c 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3768,6 +3768,7 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
 {
        RelFileNumber newrelfilenumber;
        Relation        pg_class;
+       ItemPointerData otid;
        HeapTuple       tuple;
        Form_pg_class classform;
        MultiXactId minmulti = InvalidMultiXactId;
@@ -3810,11 +3811,12 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
         */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID,
-                                                               
ObjectIdGetDatum(RelationGetRelid(relation)));
+       tuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                         
ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u",
                         RelationGetRelid(relation));
+       otid = tuple->t_self;
        classform = (Form_pg_class) GETSTRUCT(tuple);
 
        /*
@@ -3934,9 +3936,10 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
                classform->relminmxid = minmulti;
                classform->relpersistence = persistence;
 
-               CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_class, &otid, tuple);
        }
 
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tuple);
 
        table_close(pg_class, RowExclusiveLock);
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 3e03dfc..50c9440 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -30,7 +30,10 @@
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -269,6 +272,98 @@ ReleaseSysCache(HeapTuple tuple)
 }
 
 /*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                                         Datum key1)
+{
+       ItemPointerData tid;
+       LOCKTAG         tag;
+       Oid                     dboid =
+               SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+       Oid                     reloid = cacheinfo[cacheId].reloid;
+
+       /*----------
+        * Since inplace updates may happen just before our LockTuple(), we must
+        * return content acquired after LockTuple() of the TID we return.  If 
we
+        * just fetched twice instead of looping, the following sequence would
+        * defeat our locking:
+        *
+        * GRANT:   SearchSysCache1() = TID (1,5)
+        * GRANT:   LockTuple(pg_class, (1,5))
+        * [no more inplace update of (1,5) until we release the lock]
+        * CLUSTER: SearchSysCache1() = TID (1,5)
+        * CLUSTER: heap_update() = TID (1,8)
+        * CLUSTER: COMMIT
+        * GRANT:   SearchSysCache1() = TID (1,8)
+        * GRANT:   return (1,8) from SearchSysCacheLocked1()
+        * VACUUM:  SearchSysCache1() = TID (1,8)
+        * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one 
rel
+        * VACUUM:  inplace update
+        * GRANT:   heap_update() = (1,9)  # lose inplace update
+        *
+        * In the happy case, this takes two fetches, one to determine the TID 
to
+        * lock and another to get the content and confirm the TID didn't 
change.
+        *
+        * This is valid even if the row gets updated to a new TID, the old TID
+        * becomes LP_UNUSED, and the row gets updated back to its old TID.  
We'd
+        * still hold the right LOCKTAG_TUPLE and a copy of the row captured 
after
+        * the LOCKTAG_TUPLE.
+        */
+       ItemPointerSetInvalid(&tid);
+       for (;;)
+       {
+               HeapTuple       tuple;
+               LOCKMODE        lockmode = InplaceUpdateTupleLock;
+
+               tuple = SearchSysCache1(cacheId, key1);
+               if (ItemPointerIsValid(&tid))
+               {
+                       if (!HeapTupleIsValid(tuple))
+                       {
+                               LockRelease(&tag, lockmode, false);
+                               return tuple;
+                       }
+                       if (ItemPointerEquals(&tid, &tuple->t_self))
+                               return tuple;
+                       LockRelease(&tag, lockmode, false);
+               }
+               else if (!HeapTupleIsValid(tuple))
+                       return tuple;
+
+               tid = tuple->t_self;
+               ReleaseSysCache(tuple);
+               /* like: LockTuple(rel, &tid, lockmode) */
+               SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                                                 
ItemPointerGetBlockNumber(&tid),
+                                                 
ItemPointerGetOffsetNumber(&tid));
+               (void) LockAcquire(&tag, lockmode, false, false);
+
+               /*
+                * If an inplace update just finished, ensure we process the 
syscache
+                * inval.  XXX this is insufficient: the inplace updater may 
not yet
+                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                *
+                * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
+                * probably find the old tuple and reach "tuple concurrently 
updated".
+                * If that heap_update() aborts, our LOCKTAG_TUPLE blocks 
inplace
+                * updates while our caller works.
+                */
+               AcceptInvalidationMessages();
+       }
+}
+
+/*
  * SearchSysCacheCopy
  *
  * A convenience routine that does SearchSysCache and (if successful)
@@ -295,6 +390,28 @@ SearchSysCacheCopy(int cacheId,
 }
 
 /*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                                                 Datum key1)
+{
+       HeapTuple       tuple,
+                               newtuple;
+
+       tuple = SearchSysCacheLocked1(cacheId, key1);
+       if (!HeapTupleIsValid(tuple))
+               return tuple;
+       newtuple = heap_copytuple(tuple);
+       ReleaseSysCache(tuple);
+       return newtuple;
+}
+
+/*
  * SearchSysCacheExists
  *
  * A convenience routine that just probes to see if a tuple can be found.
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index af7d8fd..b078a6e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -482,6 +482,9 @@ typedef struct ResultRelInfo
        /* Have the projection and the slots above been initialized? */
        bool            ri_projectNewInfoValid;
 
+       /* updates do LockTuple() before oldtup read; see README.tuplock */
+       bool            ri_needLockTagTuple;
+
        /* triggers to be fired, if any */
        TriggerDesc *ri_TrigDesc;
 
diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h
index 934ba84..810b297 100644
--- a/src/include/storage/lockdefs.h
+++ b/src/include/storage/lockdefs.h
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                            8       /* highest standard 
lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 03a27dd..b541911 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                                                          
Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                                                        Datum 
key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                                                               
   Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                                                 Datum key1, 
Datum key2, Datum key3, Datum key4);
 extern Oid     GetSysCacheOid(int cacheId, AttrNumber oidcol,
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index fe26984..b5fe8b0 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -100,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
        SELECT relhasindex FROM pg_class
@@ -124,7 +124,6 @@ relhasindex
 f          
 (1 row)
 
-step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
@@ -155,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  <waiting ...>
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
        SELECT relhasindex FROM pg_class
@@ -195,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -224,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
diff --git a/src/test/isolation/specs/eval-plan-qual.spec 
b/src/test/isolation/specs/eval-plan-qual.spec
index 3a74406..07307e6 100644
--- a/src/test/isolation/specs/eval-plan-qual.spec
+++ b/src/test/isolation/specs/eval-plan-qual.spec
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
        update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1      {
        UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index d07ed3b..2992c85 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1        { BEGIN; }
 step grant1    {
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1       { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
@@ -48,7 +50,6 @@ step sfu3     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
-step as3       { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3        { ROLLBACK; }
 
 # Additional heap_update()
@@ -74,8 +75,6 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
        b1
        grant1
@@ -118,7 +117,6 @@ permutation
        b1
        grant1(r3)      # acquire LockTuple(), await sfu3 xmax
        read2
-       as3                     # XXX temporary until patch adds locking to 
addk2
        addk2(c1)       # block in LockTuple() behind grant1
        r3                      # unblock grant1; addk2 now awaits grant1 xmax
        c1
@@ -128,8 +126,8 @@ permutation
        b2
        sfnku2
        b1
-       grant1(c2)              # acquire LockTuple(), await sfnku2 xmax
-       addk2                   # block in LockTuple() behind grant1 = deadlock
+       grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+       addk2(*)                # block in LockTuple() behind grant1 = deadlock
        c2
        c1
        read2
@@ -140,7 +138,7 @@ permutation
        grant1
        b3
        sfu3(c1)        # acquire LockTuple(), await grant1 xmax
-       revoke4(sfu3)   # block in LockTuple() behind sfu3
+       revoke4(r3)     # block in LockTuple() behind sfu3
        c1
        r3                      # revoke4 unlocks old tuple and finds new
 
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    For inplace update, send nontransactional invalidations.
    
    The inplace update survives ROLLBACK.  The inval didn't, so another
    backend's DDL could then update the row without incorporating the
    inplace update.  In the test this fixes, a mix of CREATE INDEX and ALTER
    TABLE resulted in a table with an index, yet relhasindex=f.  That is a
    source of index corruption.
    
    Core code no longer needs XLOG_INVALIDATIONS, but this leaves removing
    it for future work.  Extensions could be relying on that mechanism, so
    that removal would not be back-patch material.  Back-patch to v12 (all
    supported versions).
    
    Reviewed by FIXME and (in earlier versions) Andres Freund.
    
    Discussion: https://postgr.es/m/20240523000548.58.nmi...@google.com

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 7de60c1..da6cff4 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6344,6 +6344,9 @@ heap_inplace_update(Relation relation,
        HeapTupleHeader htup = oldtup->t_data;
        uint32          oldlen;
        uint32          newlen;
+       int                     nmsgs = 0;
+       SharedInvalidationMessage *invalMessages = NULL;
+       bool            RelcacheInitFileInval = false;
 
        Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
        oldlen = oldtup->t_len - htup->t_hoff;
@@ -6351,6 +6354,29 @@ heap_inplace_update(Relation relation,
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
 
+       /*
+        * Construct shared cache inval if necessary.  Note that because we only
+        * pass the new version of the tuple, this mustn't be used for any
+        * operations that could change catcache lookup keys.  But we aren't
+        * bothering with index updates either, so that's true a fortiori.
+        */
+       CacheInvalidateHeapTupleInplace(relation, tuple, NULL);
+
+       /* Like RecordTransactionCommit(), log only if needed */
+       if (XLogStandbyInfoActive())
+               nmsgs = inplaceGetInvalidationMessages(&invalMessages,
+                                                                               
           &RelcacheInitFileInval);
+
+       /*
+        * Unlink relcache init files as needed.  If unlinking, acquire
+        * RelCacheInitLock until after associated invalidations.  By doing this
+        * in advance, if we checkpoint and then crash between inplace
+        * XLogInsert() and inval, we don't rely on StartupXLOG() ->
+        * RelationCacheInitFileRemove().  That uses elevel==LOG, so replay 
would
+        * neglect to PANIC on EIO.
+        */
+       PreInplace_Inval();
+
        /* NO EREPORT(ERROR) from here till changes are logged */
        START_CRIT_SECTION();
 
@@ -6380,9 +6406,16 @@ heap_inplace_update(Relation relation,
                XLogRecPtr      recptr;
 
                xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+               xlrec.dbId = MyDatabaseId;
+               xlrec.tsId = MyDatabaseTableSpace;
+               xlrec.relcacheInitFileInval = RelcacheInitFileInval;
+               xlrec.nmsgs = nmsgs;
 
                XLogBeginInsert();
-               XLogRegisterData((char *) &xlrec, SizeOfHeapInplace);
+               XLogRegisterData((char *) &xlrec, MinSizeOfHeapInplace);
+               if (nmsgs != 0)
+                       XLogRegisterData((char *) invalMessages,
+                                                        nmsgs * 
sizeof(SharedInvalidationMessage));
 
                XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
                XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
@@ -6394,17 +6427,28 @@ heap_inplace_update(Relation relation,
                PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
-       END_CRIT_SECTION();
-
-       heap_inplace_unlock(relation, oldtup, buffer);
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
        /*
-        * Send out shared cache inval if necessary.  Note that because we only
-        * pass the new version of the tuple, this mustn't be used for any
-        * operations that could change catcache lookup keys.  But we aren't
-        * bothering with index updates either, so that's true a fortiori.
+        * Send invalidations to shared queue.  SearchSysCacheLocked1() assumes 
we
+        * do this before UnlockTuple().
         *
-        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
+        * If we're mutating a tuple visible only to this transaction, there's 
an
+        * equivalent transactional inval from the action that created the 
tuple,
+        * and this inval is superfluous.
+        */
+       AtInplace_Inval();
+
+       END_CRIT_SECTION();
+       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
+
+       AcceptInvalidationMessages();   /* local processing of just-sent inval 
*/
+
+       /*
+        * Queue a transactional inval.  The immediate invalidation we just sent
+        * is the only one known to be necessary.  To reduce risk from the
+        * transition to immediate invalidation, continue sending a 
transactional
+        * invalidation like we've long done.  Third-party code might rely on 
it.
         */
        if (!IsBootstrapProcessingMode())
                CacheInvalidateHeapTuple(relation, tuple, NULL);
@@ -10151,6 +10195,12 @@ heap_xlog_inplace(XLogReaderState *record)
        }
        if (BufferIsValid(buffer))
                UnlockReleaseBuffer(buffer);
+
+       ProcessCommittedInvalidationMessages(xlrec->msgs,
+                                                                               
 xlrec->nmsgs,
+                                                                               
 xlrec->relcacheInitFileInval,
+                                                                               
 xlrec->dbId,
+                                                                               
 xlrec->tsId);
 }
 
 void
diff --git a/src/backend/access/rmgrdesc/heapdesc.c 
b/src/backend/access/rmgrdesc/heapdesc.c
index 5f5673e..f31cc3a 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -16,6 +16,7 @@
 
 #include "access/heapam_xlog.h"
 #include "access/rmgrdesc_utils.h"
+#include "storage/standbydefs.h"
 
 /*
  * NOTE: "keyname" argument cannot have trailing spaces or punctuation
@@ -253,6 +254,9 @@ heap_desc(StringInfo buf, XLogReaderState *record)
                xl_heap_inplace *xlrec = (xl_heap_inplace *) rec;
 
                appendStringInfo(buf, "off: %u", xlrec->offnum);
+               standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+                                                                  xlrec->dbId, 
xlrec->tsId,
+                                                                  
xlrec->relcacheInitFileInval);
        }
 }
 
diff --git a/src/backend/access/rmgrdesc/standbydesc.c 
b/src/backend/access/rmgrdesc/standbydesc.c
index 25f870b..32e509a 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -96,11 +96,7 @@ standby_identify(uint8 info)
        return id;
 }
 
-/*
- * This routine is used by both standby_desc and xact_desc, because
- * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
- * it seems pointless to duplicate the code.
- */
+/* also used by non-standby records having analogous invalidation fields */
 void
 standby_desc_invalidations(StringInfo buf,
                                                   int nmsgs, 
SharedInvalidationMessage *msgs,
diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index dfc8cf2..7b6e0aa 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1368,14 +1368,23 @@ RecordTransactionCommit(void)
 
                /*
                 * Transactions without an assigned xid can contain invalidation
-                * messages (e.g. explicit relcache invalidations or catcache
-                * invalidations for inplace updates); standbys need to process 
those.
-                * We can't emit a commit record without an xid, and we don't 
want to
-                * force assigning an xid, because that'd be problematic for 
e.g.
-                * vacuum.  Hence we emit a bespoke record for the 
invalidations. We
-                * don't want to use that in case a commit record is emitted, 
so they
-                * happen synchronously with commits (besides not wanting to 
emit more
-                * WAL records).
+                * messages.  While inplace updates formerly did so, they now 
send
+                * immediate invalidations.  Extensions might still do so, and
+                * standbys may need to process those invals.  We can't emit a 
commit
+                * record without an xid, and we don't want to force assigning 
an xid,
+                * because that'd be problematic for e.g. vacuum.  Hence we 
emit a
+                * bespoke record for the invalidations. We don't want to use 
that in
+                * case a commit record is emitted, so they happen 
synchronously with
+                * commits (besides not wanting to emit more WAL records).
+                *
+                * XXX Every known use of this capability is a defect.  Since 
an XID
+                * isn't controlling visibility of the change that prompted 
invals,
+                * other sessions need the inval even if this transactions 
aborts.
+                *
+                * ON COMMIT DELETE ROWS does a nontransactional index_build(), 
which
+                * queues a relcache inval, including in transactions without 
an xid
+                * that had read the (empty) table.  Standbys don't need any ON 
COMMIT
+                * DELETE ROWS invals, but we've not done the work to withhold 
them.
                 */
                if (nmsgs != 0)
                {
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index e4608b9..c3ac8f3 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2890,12 +2890,19 @@ index_update_stats(Relation rel,
        if (dirty)
        {
                systable_inplace_update_finish(state, tuple);
-               /* the above sends a cache inval message */
+               /* the above sends transactional and immediate cache inval 
messages */
        }
        else
        {
                systable_inplace_update_cancel(state);
-               /* no need to change tuple, but force relcache inval anyway */
+
+               /*
+                * While we didn't change relhasindex, CREATE INDEX needs a
+                * transactional inval for when the new index's catalog rows 
become
+                * visible.  Other CREATE INDEX and REINDEX code happens to 
also queue
+                * this inval, but keep this in case rare callers rely on this 
part of
+                * our API contract.
+                */
                CacheInvalidateRelcacheByTuple(tuple);
        }
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 05a6de6..a586d24 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -975,11 +975,6 @@ EventTriggerOnLogin(void)
                                 * this instead of regular updates serves two 
purposes. First,
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
-                                *
-                                * Changes made by inplace update may be lost 
due to
-                                * concurrent normal updates; see 
inplace-inval.spec. However,
-                                * we are OK with that.  The subsequent 
connections will still
-                                * have a chance to set "dathasloginevt" to 
false.
                                 */
                                systable_inplace_update_finish(state, tuple);
                        }
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index d687cee..e73576a 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -509,23 +509,19 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 
                        /*
                         * Inplace updates are only ever performed on catalog 
tuples and
-                        * can, per definition, not change tuple visibility.  
Since we
-                        * don't decode catalog tuples, we're not interested in 
the
-                        * record's contents.
+                        * can, per definition, not change tuple visibility.  
Inplace
+                        * updates don't affect storage or interpretation of 
table rows,
+                        * so they don't affect logicalrep_write_tuple() 
outcomes.  Hence,
+                        * we don't process invalidations from the original 
operation.  If
+                        * inplace updates did affect those things, 
invalidations wouldn't
+                        * make it work, since there are no snapshot-specific 
versions of
+                        * inplace-updated values.  Since we also don't decode 
catalog
+                        * tuples, we're not interested in the record's 
contents.
                         *
-                        * In-place updates can be used either by XID-bearing 
transactions
-                        * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
-                        * transactions (e.g.  VACUUM).  In the former case, 
the commit
-                        * record will include cache invalidations, so we mark 
the
-                        * transaction as catalog modifying here. Currently 
that's
-                        * redundant because the commit will do that as well, 
but once we
-                        * support decoding in-progress relations, this will be 
important.
+                        * WAL contains likely-unnecessary commit-time invals 
from the
+                        * CacheInvalidateHeapTuple() call in 
heap_inplace_update().
+                        * Excess invalidation is safe.
                         */
-                       if (!TransactionIdIsValid(xid))
-                               break;
-
-                       (void) SnapBuildProcessChange(builder, xid, 
buf->origptr);
-                       ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, 
buf->origptr);
                        break;
 
                case XLOG_HEAP_CONFIRM:
diff --git a/src/backend/utils/cache/catcache.c 
b/src/backend/utils/cache/catcache.c
index 492a033..f1e970b 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -2288,7 +2288,8 @@ void
 PrepareToInvalidateCacheTuple(Relation relation,
                                                          HeapTuple tuple,
                                                          HeapTuple newtuple,
-                                                         void (*function) 
(int, uint32, Oid))
+                                                         void (*function) 
(int, uint32, Oid, void *),
+                                                         void *context)
 {
        slist_iter      iter;
        Oid                     reloid;
@@ -2329,7 +2330,7 @@ PrepareToInvalidateCacheTuple(Relation relation,
                hashvalue = CatalogCacheComputeTupleHashValue(ccp, 
ccp->cc_nkeys, tuple);
                dbid = ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId;
 
-               (*function) (ccp->id, hashvalue, dbid);
+               (*function) (ccp->id, hashvalue, dbid, context);
 
                if (newtuple)
                {
@@ -2338,7 +2339,7 @@ PrepareToInvalidateCacheTuple(Relation relation,
                        newhashvalue = CatalogCacheComputeTupleHashValue(ccp, 
ccp->cc_nkeys, newtuple);
 
                        if (newhashvalue != hashvalue)
-                               (*function) (ccp->id, newhashvalue, dbid);
+                               (*function) (ccp->id, newhashvalue, dbid, 
context);
                }
        }
 }
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 603aa41..b3d3adb 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -94,6 +94,10 @@
  *     worth trying to avoid sending such inval traffic in the future, if those
  *     problems can be overcome cheaply.
  *
+ *     When making a nontransactional change to a cacheable object, we must
+ *     likewise send the invalidation immediately, before ending the change's
+ *     critical section.  This includes inplace heap updates, relmap, and smgr.
+ *
  *     When wal_level=logical, write invalidations into WAL at each command 
end to
  *     support the decoding of the in-progress transactions.  See
  *     CommandEndInvalidationMessages.
@@ -130,13 +134,15 @@
 
 /*
  * Pending requests are stored as ready-to-send SharedInvalidationMessages.
- * We keep the messages themselves in arrays in TopTransactionContext
- * (there are separate arrays for catcache and relcache messages).  Control
- * information is kept in a chain of TransInvalidationInfo structs, also
- * allocated in TopTransactionContext.  (We could keep a subtransaction's
- * TransInvalidationInfo in its CurTransactionContext; but that's more
- * wasteful not less so, since in very many scenarios it'd be the only
- * allocation in the subtransaction's CurTransactionContext.)
+ * We keep the messages themselves in arrays in TopTransactionContext (there
+ * are separate arrays for catcache and relcache messages).  For transactional
+ * messages, control information is kept in a chain of TransInvalidationInfo
+ * structs, also allocated in TopTransactionContext.  (We could keep a
+ * subtransaction's TransInvalidationInfo in its CurTransactionContext; but
+ * that's more wasteful not less so, since in very many scenarios it'd be the
+ * only allocation in the subtransaction's CurTransactionContext.)  For
+ * inplace update messages, control information appears in an
+ * InvalidationInfo, allocated in CurrentMemoryContext.
  *
  * We can store the message arrays densely, and yet avoid moving data around
  * within an array, because within any one subtransaction we need only
@@ -147,7 +153,9 @@
  * struct.  Similarly, we need distinguish messages of prior subtransactions
  * from those of the current subtransaction only until the subtransaction
  * completes, after which we adjust the array indexes in the parent's
- * TransInvalidationInfo to include the subtransaction's messages.
+ * TransInvalidationInfo to include the subtransaction's messages.  Inplace
+ * invalidations don't need a concept of command or subtransaction boundaries,
+ * since we send them during the WAL insertion critical section.
  *
  * The ordering of the individual messages within a command's or
  * subtransaction's output is not considered significant, although this
@@ -200,7 +208,7 @@ typedef struct InvalidationMsgsGroup
 
 
 /*----------------
- * Invalidation messages are divided into two groups:
+ * Transactional invalidation messages are divided into two groups:
  *     1) events so far in current command, not yet reflected to caches.
  *     2) events in previous commands of current transaction; these have
  *        been reflected to local caches, and must be either broadcast to
@@ -216,26 +224,36 @@ typedef struct InvalidationMsgsGroup
  *----------------
  */
 
-typedef struct TransInvalidationInfo
+/* fields common to both transactional and inplace invalidation */
+typedef struct InvalidationInfo
 {
-       /* Back link to parent transaction's info */
-       struct TransInvalidationInfo *parent;
-
-       /* Subtransaction nesting depth */
-       int                     my_level;
-
        /* Events emitted by current command */
        InvalidationMsgsGroup CurrentCmdInvalidMsgs;
 
-       /* Events emitted by previous commands of this (sub)transaction */
-       InvalidationMsgsGroup PriorCmdInvalidMsgs;
-
        /* init file must be invalidated? */
        bool            RelcacheInitFileInval;
+} InvalidationInfo;
+
+/* subclass adding fields specific to transactional invalidation */
+typedef struct TransInvalidationInfo
+{
+       /* Base class */
+       struct InvalidationInfo ii;
+
+       /* Events emitted by previous commands of this (sub)transaction */
+       InvalidationMsgsGroup PriorCmdInvalidMsgs;
+
+       /* Back link to parent transaction's info */
+       struct TransInvalidationInfo *parent;
+
+       /* Subtransaction nesting depth */
+       int                     my_level;
 } TransInvalidationInfo;
 
 static TransInvalidationInfo *transInvalInfo = NULL;
 
+static InvalidationInfo *inplaceInvalInfo = NULL;
+
 /* GUC storage */
 int                    debug_discard_caches = 0;
 
@@ -543,9 +561,12 @@ ProcessInvalidationMessagesMulti(InvalidationMsgsGroup 
*group,
 static void
 RegisterCatcacheInvalidation(int cacheId,
                                                         uint32 hashValue,
-                                                        Oid dbId)
+                                                        Oid dbId,
+                                                        void *context)
 {
-       AddCatcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
+       InvalidationInfo *info = (InvalidationInfo *) context;
+
+       AddCatcacheInvalidationMessage(&info->CurrentCmdInvalidMsgs,
                                                                   cacheId, 
hashValue, dbId);
 }
 
@@ -555,10 +576,9 @@ RegisterCatcacheInvalidation(int cacheId,
  * Register an invalidation event for all catcache entries from a catalog.
  */
 static void
-RegisterCatalogInvalidation(Oid dbId, Oid catId)
+RegisterCatalogInvalidation(InvalidationInfo *info, Oid dbId, Oid catId)
 {
-       AddCatalogInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                 dbId, catId);
+       AddCatalogInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
catId);
 }
 
 /*
@@ -567,10 +587,9 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId)
  * As above, but register a relcache invalidation event.
  */
 static void
-RegisterRelcacheInvalidation(Oid dbId, Oid relId)
+RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 {
-       AddRelcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                  dbId, relId);
+       AddRelcacheInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
relId);
 
        /*
         * Most of the time, relcache invalidation is associated with system
@@ -587,7 +606,7 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
         * as well.  Also zap when we are invalidating whole relcache.
         */
        if (relId == InvalidOid || RelationIdIsInInitFile(relId))
-               transInvalInfo->RelcacheInitFileInval = true;
+               info->RelcacheInitFileInval = true;
 }
 
 /*
@@ -597,24 +616,27 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
  * Only needed for catalogs that don't have catcaches.
  */
 static void
-RegisterSnapshotInvalidation(Oid dbId, Oid relId)
+RegisterSnapshotInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 {
-       AddSnapshotInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                  dbId, relId);
+       AddSnapshotInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
relId);
 }
 
 /*
  * PrepareInvalidationState
  *             Initialize inval data for the current (sub)transaction.
  */
-static void
+static InvalidationInfo *
 PrepareInvalidationState(void)
 {
        TransInvalidationInfo *myInfo;
 
+       Assert(IsTransactionState());
+       /* Can't queue transactional message while collecting inplace messages. 
*/
+       Assert(inplaceInvalInfo == NULL);
+
        if (transInvalInfo != NULL &&
                transInvalInfo->my_level == GetCurrentTransactionNestLevel())
-               return;
+               return (InvalidationInfo *) transInvalInfo;
 
        myInfo = (TransInvalidationInfo *)
                MemoryContextAllocZero(TopTransactionContext,
@@ -637,7 +659,7 @@ PrepareInvalidationState(void)
                 * counter.  This is a convenient place to check for that, as 
well as
                 * being important to keep management of the message arrays 
simple.
                 */
-               if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) 
!= 0)
+               if 
(NumMessagesInGroup(&transInvalInfo->ii.CurrentCmdInvalidMsgs) != 0)
                        elog(ERROR, "cannot start a subtransaction when there 
are unprocessed inval messages");
 
                /*
@@ -646,8 +668,8 @@ PrepareInvalidationState(void)
                 * to update them to follow whatever is already in the arrays.
                 */
                SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
-                                                
&transInvalInfo->CurrentCmdInvalidMsgs);
-               SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                                                
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
+               SetGroupToFollow(&myInfo->ii.CurrentCmdInvalidMsgs,
                                                 &myInfo->PriorCmdInvalidMsgs);
        }
        else
@@ -663,6 +685,41 @@ PrepareInvalidationState(void)
        }
 
        transInvalInfo = myInfo;
+       return (InvalidationInfo *) myInfo;
+}
+
+/*
+ * PrepareInplaceInvalidationState
+ *             Initialize inval data for an inplace update.
+ *
+ * See previous function for more background.
+ */
+static InvalidationInfo *
+PrepareInplaceInvalidationState(void)
+{
+       InvalidationInfo *myInfo;
+
+       Assert(IsTransactionState());
+       /* limit of one inplace update under assembly */
+       Assert(inplaceInvalInfo == NULL);
+
+       /* gone after WAL insertion CritSection ends, so use current context */
+       myInfo = (InvalidationInfo *) palloc0(sizeof(InvalidationInfo));
+
+       /* Stash our messages past end of the transactional messages, if any. */
+       if (transInvalInfo != NULL)
+               SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                                                
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
+       else
+       {
+               InvalMessageArrays[CatCacheMsgs].msgs = NULL;
+               InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
+               InvalMessageArrays[RelCacheMsgs].msgs = NULL;
+               InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
+       }
+
+       inplaceInvalInfo = myInfo;
+       return myInfo;
 }
 
 /* ----------------------------------------------------------------
@@ -902,7 +959,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
         * after we send the SI messages.  However, we need not do anything 
unless
         * we committed.
         */
-       *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;
+       *RelcacheInitFileInval = transInvalInfo->ii.RelcacheInitFileInval;
 
        /*
         * Collect all the pending messages into a single contiguous array of
@@ -913,7 +970,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
         * not new ones.
         */
        nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
-               NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
+               NumMessagesInGroup(&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 
        *msgs = msgarray = (SharedInvalidationMessage *)
                MemoryContextAlloc(CurTransactionContext,
@@ -926,7 +983,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                                                                
msgs,
                                                                                
n * sizeof(SharedInvalidationMessage)),
                                                                 nmsgs += n));
-       ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessMessageSubGroupMulti(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
                                                                CatCacheMsgs,
                                                                
(memcpy(msgarray + nmsgs,
                                                                                
msgs,
@@ -938,7 +995,51 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                                                                
msgs,
                                                                                
n * sizeof(SharedInvalidationMessage)),
                                                                 nmsgs += n));
-       ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessMessageSubGroupMulti(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
+                                                               RelCacheMsgs,
+                                                               
(memcpy(msgarray + nmsgs,
+                                                                               
msgs,
+                                                                               
n * sizeof(SharedInvalidationMessage)),
+                                                                nmsgs += n));
+       Assert(nmsgs == nummsgs);
+
+       return nmsgs;
+}
+
+/*
+ * inplaceGetInvalidationMessages() is called by the inplace update to collect
+ * invalidation messages to add to its WAL record.  Like the previous
+ * function, we might still fail.
+ */
+int
+inplaceGetInvalidationMessages(SharedInvalidationMessage **msgs,
+                                                          bool 
*RelcacheInitFileInval)
+{
+       SharedInvalidationMessage *msgarray;
+       int                     nummsgs;
+       int                     nmsgs;
+
+       /* Quick exit if we haven't done anything with invalidation messages. */
+       if (inplaceInvalInfo == NULL)
+       {
+               *RelcacheInitFileInval = false;
+               *msgs = NULL;
+               return 0;
+       }
+
+       *RelcacheInitFileInval = inplaceInvalInfo->RelcacheInitFileInval;
+       nummsgs = NumMessagesInGroup(&inplaceInvalInfo->CurrentCmdInvalidMsgs);
+       *msgs = msgarray = (SharedInvalidationMessage *)
+               palloc(nummsgs * sizeof(SharedInvalidationMessage));
+
+       nmsgs = 0;
+       ProcessMessageSubGroupMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
+                                                               CatCacheMsgs,
+                                                               
(memcpy(msgarray + nmsgs,
+                                                                               
msgs,
+                                                                               
n * sizeof(SharedInvalidationMessage)),
+                                                                nmsgs += n));
+       ProcessMessageSubGroupMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
                                                                RelCacheMsgs,
                                                                
(memcpy(msgarray + nmsgs,
                                                                                
msgs,
@@ -1038,16 +1139,16 @@ AtEOXact_Inval(bool isCommit)
                 * after we send the SI messages.  However, we need not do 
anything
                 * unless we committed.
                 */
-               if (transInvalInfo->RelcacheInitFileInval)
+               if (transInvalInfo->ii.RelcacheInitFileInval)
                        RelationCacheInitFilePreInvalidate();
 
                AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                                                  
&transInvalInfo->CurrentCmdInvalidMsgs);
+                                                                  
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 
                
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
                                                                                
 SendSharedInvalidMessages);
 
-               if (transInvalInfo->RelcacheInitFileInval)
+               if (transInvalInfo->ii.RelcacheInitFileInval)
                        RelationCacheInitFilePostInvalidate();
        }
        else
@@ -1058,6 +1159,45 @@ AtEOXact_Inval(bool isCommit)
 
        /* Need not free anything explicitly */
        transInvalInfo = NULL;
+       inplaceInvalInfo = NULL;
+}
+
+/*
+ * PreInplace_Inval
+ *             Process queued-up invalidation before inplace update critical 
section.
+ *
+ * Tasks belong here if they are safe even if the inplace update does not
+ * complete.  Currently, this just unlinks a cache file, which can fail.  The
+ * sum of this and AtInplace_Inval() mirrors AtEOXact_Inval(isCommit=true).
+ */
+void
+PreInplace_Inval(void)
+{
+       Assert(CritSectionCount == 0);
+
+       if (inplaceInvalInfo && inplaceInvalInfo->RelcacheInitFileInval)
+               RelationCacheInitFilePreInvalidate();
+}
+
+/*
+ * AtInplace_Inval
+ *             Process queued-up invalidations after inplace update buffer 
mutation.
+ */
+void
+AtInplace_Inval(void)
+{
+       Assert(CritSectionCount > 0);
+
+       if (inplaceInvalInfo == NULL)
+               return;
+
+       
ProcessInvalidationMessagesMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
+                                                                        
SendSharedInvalidMessages);
+
+       if (inplaceInvalInfo->RelcacheInitFileInval)
+               RelationCacheInitFilePostInvalidate();
+
+       inplaceInvalInfo = NULL;
 }
 
 /*
@@ -1125,18 +1265,21 @@ AtEOSubXact_Inval(bool isCommit)
                                                                   
&myInfo->PriorCmdInvalidMsgs);
 
                /* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
-               SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
+               SetGroupToFollow(&myInfo->parent->ii.CurrentCmdInvalidMsgs,
                                                 
&myInfo->parent->PriorCmdInvalidMsgs);
 
                /* Pending relcache inval becomes parent's problem too */
-               if (myInfo->RelcacheInitFileInval)
-                       myInfo->parent->RelcacheInitFileInval = true;
+               if (myInfo->ii.RelcacheInitFileInval)
+                       myInfo->parent->ii.RelcacheInitFileInval = true;
 
                /* Pop the transaction state stack */
                transInvalInfo = myInfo->parent;
 
                /* Need not free anything else explicitly */
                pfree(myInfo);
+
+               /* Successful inplace update must clear this. */
+               Assert(inplaceInvalInfo == NULL);
        }
        else
        {
@@ -1148,6 +1291,9 @@ AtEOSubXact_Inval(bool isCommit)
 
                /* Need not free anything else explicitly */
                pfree(myInfo);
+
+               /* Reset from aborted inplace update. */
+               inplaceInvalInfo = NULL;
        }
 }
 
@@ -1177,7 +1323,7 @@ CommandEndInvalidationMessages(void)
        if (transInvalInfo == NULL)
                return;
 
-       ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessInvalidationMessages(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
                                                                
LocalExecuteInvalidationMessage);
 
        /* WAL Log per-command invalidation messages for wal_level=logical */
@@ -1185,26 +1331,21 @@ CommandEndInvalidationMessages(void)
                LogLogicalInvalidations();
 
        AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                                          
&transInvalInfo->CurrentCmdInvalidMsgs);
+                                                          
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 }
 
 
 /*
- * CacheInvalidateHeapTuple
- *             Register the given tuple for invalidation at end of command
- *             (ie, current command is creating or outdating this tuple).
- *             Also, detect whether a relcache invalidation is implied.
- *
- * For an insert or delete, tuple is the target tuple and newtuple is NULL.
- * For an update, we are called just once, with tuple being the old tuple
- * version and newtuple the new version.  This allows avoidance of duplicate
- * effort during an update.
+ * CacheInvalidateHeapTupleCommon
+ *             Common logic for end-of-command and inplace variants.
  */
-void
-CacheInvalidateHeapTuple(Relation relation,
-                                                HeapTuple tuple,
-                                                HeapTuple newtuple)
+static void
+CacheInvalidateHeapTupleCommon(Relation relation,
+                                                          HeapTuple tuple,
+                                                          HeapTuple newtuple,
+                                                          InvalidationInfo 
*(*prepare_callback) (void))
 {
+       InvalidationInfo *info;
        Oid                     tupleRelId;
        Oid                     databaseId;
        Oid                     relationId;
@@ -1228,11 +1369,8 @@ CacheInvalidateHeapTuple(Relation relation,
        if (IsToastRelation(relation))
                return;
 
-       /*
-        * If we're not prepared to queue invalidation messages for this
-        * subtransaction level, get ready now.
-        */
-       PrepareInvalidationState();
+       /* Allocate any required resources. */
+       info = prepare_callback();
 
        /*
         * First let the catcache do its thing
@@ -1241,11 +1379,12 @@ CacheInvalidateHeapTuple(Relation relation,
        if (RelationInvalidatesSnapshotsOnly(tupleRelId))
        {
                databaseId = IsSharedRelation(tupleRelId) ? InvalidOid : 
MyDatabaseId;
-               RegisterSnapshotInvalidation(databaseId, tupleRelId);
+               RegisterSnapshotInvalidation(info, databaseId, tupleRelId);
        }
        else
                PrepareToInvalidateCacheTuple(relation, tuple, newtuple,
-                                                                         
RegisterCatcacheInvalidation);
+                                                                         
RegisterCatcacheInvalidation,
+                                                                         (void 
*) info);
 
        /*
         * Now, is this tuple one of the primary definers of a relcache entry? 
See
@@ -1318,7 +1457,44 @@ CacheInvalidateHeapTuple(Relation relation,
        /*
         * Yes.  We need to register a relcache invalidation event.
         */
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(info, databaseId, relationId);
+}
+
+/*
+ * CacheInvalidateHeapTuple
+ *             Register the given tuple for invalidation at end of command
+ *             (ie, current command is creating or outdating this tuple) and 
end of
+ *             transaction.  Also, detect whether a relcache invalidation is 
implied.
+ *
+ * For an insert or delete, tuple is the target tuple and newtuple is NULL.
+ * For an update, we are called just once, with tuple being the old tuple
+ * version and newtuple the new version.  This allows avoidance of duplicate
+ * effort during an update.
+ */
+void
+CacheInvalidateHeapTuple(Relation relation,
+                                                HeapTuple tuple,
+                                                HeapTuple newtuple)
+{
+       CacheInvalidateHeapTupleCommon(relation, tuple, newtuple,
+                                                                  
PrepareInvalidationState);
+}
+
+/*
+ * CacheInvalidateHeapTupleInplace
+ *             Register the given tuple for nontransactional invalidation 
pertaining
+ *             to an inplace update.  Also, detect whether a relcache 
invalidation is
+ *             implied.
+ *
+ * Like CacheInvalidateHeapTuple(), but for inplace updates.
+ */
+void
+CacheInvalidateHeapTupleInplace(Relation relation,
+                                                               HeapTuple tuple,
+                                                               HeapTuple 
newtuple)
+{
+       CacheInvalidateHeapTupleCommon(relation, tuple, newtuple,
+                                                                  
PrepareInplaceInvalidationState);
 }
 
 /*
@@ -1337,14 +1513,13 @@ CacheInvalidateCatalog(Oid catalogId)
 {
        Oid                     databaseId;
 
-       PrepareInvalidationState();
-
        if (IsSharedRelation(catalogId))
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
 
-       RegisterCatalogInvalidation(databaseId, catalogId);
+       RegisterCatalogInvalidation(PrepareInvalidationState(),
+                                                               databaseId, 
catalogId);
 }
 
 /*
@@ -1362,15 +1537,14 @@ CacheInvalidateRelcache(Relation relation)
        Oid                     databaseId;
        Oid                     relationId;
 
-       PrepareInvalidationState();
-
        relationId = RelationGetRelid(relation);
        if (relation->rd_rel->relisshared)
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
 
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                databaseId, 
relationId);
 }
 
 /*
@@ -1383,9 +1557,8 @@ CacheInvalidateRelcache(Relation relation)
 void
 CacheInvalidateRelcacheAll(void)
 {
-       PrepareInvalidationState();
-
-       RegisterRelcacheInvalidation(InvalidOid, InvalidOid);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                InvalidOid, 
InvalidOid);
 }
 
 /*
@@ -1399,14 +1572,13 @@ CacheInvalidateRelcacheByTuple(HeapTuple classTuple)
        Oid                     databaseId;
        Oid                     relationId;
 
-       PrepareInvalidationState();
-
        relationId = classtup->oid;
        if (classtup->relisshared)
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                databaseId, 
relationId);
 }
 
 /*
@@ -1420,8 +1592,6 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 {
        HeapTuple       tup;
 
-       PrepareInvalidationState();
-
        tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relid);
@@ -1611,7 +1781,7 @@ LogLogicalInvalidations(void)
        if (transInvalInfo == NULL)
                return;
 
-       group = &transInvalInfo->CurrentCmdInvalidMsgs;
+       group = &transInvalInfo->ii.CurrentCmdInvalidMsgs;
        nmsgs = NumMessagesInGroup(group);
 
        if (nmsgs > 0)
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 50c9440..f41b1c2 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -351,8 +351,7 @@ SearchSysCacheLocked1(int cacheId,
 
                /*
                 * If an inplace update just finished, ensure we process the 
syscache
-                * inval.  XXX this is insufficient: the inplace updater may 
not yet
-                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                * inval.
                 *
                 * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
                 * probably find the old tuple and reach "tuple concurrently 
updated".
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 42736f3..4591e9a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -20,6 +20,7 @@
 #include "storage/buf.h"
 #include "storage/bufpage.h"
 #include "storage/relfilelocator.h"
+#include "storage/sinval.h"
 #include "utils/relcache.h"
 
 
@@ -425,9 +426,14 @@ typedef struct xl_heap_confirm
 typedef struct xl_heap_inplace
 {
        OffsetNumber offnum;            /* updated tuple's offset on page */
+       Oid                     dbId;                   /* MyDatabaseId */
+       Oid                     tsId;                   /* MyDatabaseTableSpace 
*/
+       bool            relcacheInitFileInval;  /* invalidate relcache init 
files */
+       int                     nmsgs;                  /* number of shared 
inval msgs */
+       SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
 } xl_heap_inplace;
 
-#define SizeOfHeapInplace      (offsetof(xl_heap_inplace, offnum) + 
sizeof(OffsetNumber))
+#define MinSizeOfHeapInplace   (offsetof(xl_heap_inplace, nmsgs) + sizeof(int))
 
 /*
  * This is what we need to know about setting a visibility map bit
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 8f5744b..c812237 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -144,6 +144,8 @@ extern void ProcessCatchupInterrupt(void);
 
 extern int     xactGetCommittedInvalidationMessages(SharedInvalidationMessage 
**msgs,
                                                                                
                 bool *RelcacheInitFileInval);
+extern int     inplaceGetInvalidationMessages(SharedInvalidationMessage **msgs,
+                                                                               
   bool *RelcacheInitFileInval);
 extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage 
*msgs,
                                                                                
                 int nmsgs, bool RelcacheInitFileInval,
                                                                                
                 Oid dbid, Oid tsid);
diff --git a/src/include/utils/catcache.h b/src/include/utils/catcache.h
index 3fb9647..8f04bb8 100644
--- a/src/include/utils/catcache.h
+++ b/src/include/utils/catcache.h
@@ -225,6 +225,7 @@ extern void CatCacheInvalidate(CatCache *cache, uint32 
hashValue);
 extern void PrepareToInvalidateCacheTuple(Relation relation,
                                                                                
  HeapTuple tuple,
                                                                                
  HeapTuple newtuple,
-                                                                               
  void (*function) (int, uint32, Oid));
+                                                                               
  void (*function) (int, uint32, Oid, void *),
+                                                                               
  void *context);
 
 #endif                                                 /* CATCACHE_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 24695fa..3390e7a 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -28,6 +28,9 @@ extern void AcceptInvalidationMessages(void);
 
 extern void AtEOXact_Inval(bool isCommit);
 
+extern void PreInplace_Inval(void);
+extern void AtInplace_Inval(void);
+
 extern void AtEOSubXact_Inval(bool isCommit);
 
 extern void PostPrepare_Inval(void);
@@ -37,6 +40,9 @@ extern void CommandEndInvalidationMessages(void);
 extern void CacheInvalidateHeapTuple(Relation relation,
                                                                         
HeapTuple tuple,
                                                                         
HeapTuple newtuple);
+extern void CacheInvalidateHeapTupleInplace(Relation relation,
+                                                                               
        HeapTuple tuple,
+                                                                               
        HeapTuple newtuple);
 
 extern void CacheInvalidateCatalog(Oid catalogId);
 
diff --git a/src/test/isolation/expected/inplace-inval.out 
b/src/test/isolation/expected/inplace-inval.out
index e68eca5..c35895a 100644
--- a/src/test/isolation/expected/inplace-inval.out
+++ b/src/test/isolation/expected/inplace-inval.out
@@ -1,6 +1,6 @@
 Parsed test spec with 3 sessions
 
-starting permutation: cachefill3 cir1 cic2 ddl3
+starting permutation: cachefill3 cir1 cic2 ddl3 read1
 step cachefill3: TABLE newly_indexed;
 c
 -
@@ -9,6 +9,14 @@ c
 step cir1: BEGIN; CREATE INDEX i1 ON newly_indexed (c); ROLLBACK;
 step cic2: CREATE INDEX i2 ON newly_indexed (c);
 step ddl3: ALTER TABLE newly_indexed ADD extra int;
+step read1: 
+       SELECT relhasindex FROM pg_class WHERE oid = 'newly_indexed'::regclass;
+
+relhasindex
+-----------
+t          
+(1 row)
+
 
 starting permutation: cir1 cic2 ddl3 read1
 step cir1: BEGIN; CREATE INDEX i1 ON newly_indexed (c); ROLLBACK;
diff --git a/src/test/isolation/specs/inplace-inval.spec 
b/src/test/isolation/specs/inplace-inval.spec
index 96954fd..b99112d 100644
--- a/src/test/isolation/specs/inplace-inval.spec
+++ b/src/test/isolation/specs/inplace-inval.spec
@@ -1,7 +1,7 @@
-# If a heap_update() caller retrieves its oldtup from a cache, it's possible
-# for that cache entry to predate an inplace update, causing loss of that
-# inplace update.  This arises because the transaction may abort before
-# sending the inplace invalidation message to the shared queue.
+# An inplace update had been able to abort before sending the inplace
+# invalidation message to the shared queue.  If a heap_update() caller then
+# retrieved its oldtup from a cache, the heap_update() could revert the
+# inplace update.
 
 setup
 {
@@ -27,14 +27,12 @@ step cachefill3     { TABLE newly_indexed; }
 step ddl3              { ALTER TABLE newly_indexed ADD extra int; }
 
 
-# XXX shows an extant bug.  Adding step read1 at the end would usually print
-# relhasindex=f (not wanted).  This does not reach the unwanted behavior under
-# -DCATCACHE_FORCE_RELEASE and friends.
 permutation
        cachefill3      # populates the pg_class row in the catcache
        cir1    # sets relhasindex=true; rollback discards cache inval
        cic2    # sees relhasindex=true, skips changing it (so no inval)
        ddl3    # cached row as the oldtup of an update, losing relhasindex
+       read1   # observe damage
 
 # without cachefill3, no bug
 permutation cir1 cic2 ddl3 read1
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 6d424c8..13dd553 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1256,6 +1256,7 @@ Interval
 IntervalAggState
 IntoClause
 InvalMessageArray
+InvalidationInfo
 InvalidationMsgsGroup
 IpcMemoryId
 IpcMemoryKey

Reply via email to