On Wed, Jul 03, 2024 at 04:09:54PM -0700, Noah Misch wrote:
> On Wed, Jul 03, 2024 at 06:00:00AM +0300, Alexander Lakhin wrote:
> > 29.06.2024 05:42, Noah Misch wrote:
> > > Good point, any effort on (2) would be wasted once the fixes get 
> > > certified.  I
> > > pushed (1).  I'm attaching the rebased fix patches.
> > 
> > Please look at a new anomaly, introduced by inplace110-successors-v5.patch:
> > CREATE TABLE t (i int) PARTITION BY LIST(i);
> > CREATE TABLE p1 (i int);
> > ALTER TABLE t ATTACH PARTITION p1 FOR VALUES IN (1);
> > ALTER TABLE t DETACH PARTITION p1;
> > ANALYZE t;
> > 
> > triggers unexpected
> > ERROR:  tuple to be updated was already modified by an operation triggered 
> > by the current command
> 
> Thanks.  Today, it's okay to issue heap_inplace_update() after heap_update()
> without an intervening CommandCounterIncrement().

Correction: it's not okay today.  If code does that, heap_inplace_update()
mutates a tuple that is going to become invisible at CCI.  The lack of CCI
yields a minor live bug in v14+.  Its consequences seem to be limited to
failing to update reltuples for a partitioned table having zero partitions.

> The patch makes the CCI
> required.  The ANALYZE in your example reaches this with a heap_update to set
> relhassubclass=f.  I've fixed this by just adding a CCI (and adding to the
> tests in vacuum.sql).

That's still the right fix, but I've separated it into its own patch and
expanded the test.  All the non-comment changes between v5 and v6 are now part
of the separate patch.

> The alternative would be to allow inplace updates on TM_SelfModified tuples.
> I can't think of a specific problem with allowing that, but I feel that would
> make system state interactions harder to reason about.  It might be optimal to
> allow that in back branches only, to reduce the chance of releasing a bug like
> the one you found.

Allowing a mutation of a TM_SelfModified tuple is bad, since that tuple is
going to become dead soon.  Mutating its successor could be okay.  Since we'd
expect such code to be unreachable, I'm not keen carry such code.  For that
scenario, I'd rather keep the error you encountered.  Other opinions?
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Don't lose partitioned table reltuples=0 after relhassubclass=f.
    
    ANALYZE sets relhassubclass=f when a partitioned table no longer has
    partitions.  An ANALYZE doing that proceeded to apply the inplace update
    of pg_class.reltuples to the old pg_class tuple instead of the new
    tuple, losing that reltuples=0 change if the ANALYZE committed.
    Non-partitioning inheritance trees were unaffected.  Back-patch to v14,
    where commit 375aed36ad83f0e021e9bdd3a0034c0c992c66dc introduced
    maintenance of partitioned table pg_class.reltuples.
    
    Reviewed by FIXME.  Reported by Alexander Lakhin.
    
    Discussion: 
https://postgr.es/m/a295b499-dcab-6a99-c06e-01cf60593...@gmail.com

diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 7d2cd24..c590a2a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -629,7 +629,11 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
                else
                        relallvisible = 0;
 
-               /* Update pg_class for table relation */
+               /*
+                * Update pg_class for table relation.  CCI first, in case 
acquirefunc
+                * updated pg_class.
+                */
+               CommandCounterIncrement();
                vac_update_relstats(onerel,
                                                        relpages,
                                                        totalrows,
@@ -664,6 +668,7 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
                 * Partitioned tables don't have storage, so we don't set any 
fields
                 * in their pg_class entries except for reltuples and 
relhasindex.
                 */
+               CommandCounterIncrement();
                vac_update_relstats(onerel, -1, totalrows,
                                                        0, hasindex, 
InvalidTransactionId,
                                                        InvalidMultiXactId,
diff --git a/src/test/regress/expected/vacuum.out 
b/src/test/regress/expected/vacuum.out
index 330fcd8..2eba712 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -83,6 +83,53 @@ INSERT INTO vactst SELECT generate_series(301, 400);
 DELETE FROM vactst WHERE i % 5 <> 0; -- delete a few rows inside
 ANALYZE vactst;
 COMMIT;
+-- Test ANALYZE setting relhassubclass=f for non-partitioning inheritance
+BEGIN;
+CREATE TABLE past_inh_parent ();
+CREATE TABLE past_inh_child () INHERITS (past_inh_parent);
+INSERT INTO past_inh_child DEFAULT VALUES;
+INSERT INTO past_inh_child DEFAULT VALUES;
+ANALYZE past_inh_parent;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_inh_parent'::regclass;
+ reltuples | relhassubclass 
+-----------+----------------
+         0 | t
+(1 row)
+
+DROP TABLE past_inh_child;
+ANALYZE past_inh_parent;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_inh_parent'::regclass;
+ reltuples | relhassubclass 
+-----------+----------------
+         0 | f
+(1 row)
+
+COMMIT;
+-- Test ANALYZE setting relhassubclass=f for partitioning
+BEGIN;
+CREATE TABLE past_parted (i int) PARTITION BY LIST(i);
+CREATE TABLE past_part PARTITION OF past_parted FOR VALUES IN (1);
+INSERT INTO past_parted VALUES (1),(1);
+ANALYZE past_parted;
+DROP TABLE past_part;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_parted'::regclass;
+ reltuples | relhassubclass 
+-----------+----------------
+         2 | t
+(1 row)
+
+ANALYZE past_parted;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_parted'::regclass;
+ reltuples | relhassubclass 
+-----------+----------------
+         0 | f
+(1 row)
+
+COMMIT;
 VACUUM FULL pg_am;
 VACUUM FULL pg_class;
 VACUUM FULL pg_database;
diff --git a/src/test/regress/sql/vacuum.sql b/src/test/regress/sql/vacuum.sql
index 0b63ef8..548cd7a 100644
--- a/src/test/regress/sql/vacuum.sql
+++ b/src/test/regress/sql/vacuum.sql
@@ -67,6 +67,35 @@ DELETE FROM vactst WHERE i % 5 <> 0; -- delete a few rows 
inside
 ANALYZE vactst;
 COMMIT;
 
+-- Test ANALYZE setting relhassubclass=f for non-partitioning inheritance
+BEGIN;
+CREATE TABLE past_inh_parent ();
+CREATE TABLE past_inh_child () INHERITS (past_inh_parent);
+INSERT INTO past_inh_child DEFAULT VALUES;
+INSERT INTO past_inh_child DEFAULT VALUES;
+ANALYZE past_inh_parent;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_inh_parent'::regclass;
+DROP TABLE past_inh_child;
+ANALYZE past_inh_parent;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_inh_parent'::regclass;
+COMMIT;
+
+-- Test ANALYZE setting relhassubclass=f for partitioning
+BEGIN;
+CREATE TABLE past_parted (i int) PARTITION BY LIST(i);
+CREATE TABLE past_part PARTITION OF past_parted FOR VALUES IN (1);
+INSERT INTO past_parted VALUES (1),(1);
+ANALYZE past_parted;
+DROP TABLE past_part;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_parted'::regclass;
+ANALYZE past_parted;
+SELECT reltuples, relhassubclass
+  FROM pg_class WHERE oid = 'past_parted'::regclass;
+COMMIT;
+
 VACUUM FULL pg_am;
 VACUUM FULL pg_class;
 VACUUM FULL pg_database;
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Warn if LOCKTAG_TUPLE is held at commit, under debug_assertions.
    
    The current use always releases this locktag.  A planned use will
    continue that intent.  It will involve more areas of code, making unlock
    omissions easier.  Warn under debug_assertions, like we do for various
    resource leaks.  Back-patch to v12 (all supported versions), the plan
    for the commit of the new use.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20240512232923.aa.nmi...@google.com

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 0400a50..461d925 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -2256,6 +2256,11 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                                locallock->numLockOwners = 0;
                }
 
+#ifdef USE_ASSERT_CHECKING
+               if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_TUPLE && !allLocks)
+                       elog(WARNING, "tuple lock held at commit");
+#endif
+
                /*
                 * If the lock or proclock pointers are NULL, this lock was 
taken via
                 * the relation fast-path (and is not known to have been 
transferred).
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Fix data loss at inplace update after heap_update().
    
    As previously-added tests demonstrated, heap_inplace_update() could
    instead update an unrelated tuple of the same catalog.  It could lose
    the update.  Losing relhasindex=t was a source of index corruption.
    Inplace-updating commands like VACUUM will now wait for heap_update()
    commands like GRANT TABLE and GRANT DATABASE.  That isn't ideal, but a
    long-running GRANT already hurts VACUUM progress more just by keeping an
    XID running.  The VACUUM will behave like a DELETE or UPDATE waiting for
    the uncommitted change.
    
    For implementation details, start at the heap_inplace_update_scan()
    header comment and README.tuplock.  Back-patch to v12 (all supported
    versions).  In back branches, retain a deprecated heap_inplace_update(),
    for extensions.
    
    Reviewed by FIXME and Alexander Lakhin.
    
    Discussion: 
https://postgr.es/m/CAMp+ueZQz3yDk7qg42hk6-9gxniYbp-=bg2mgqecerqr5gg...@mail.gmail.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index 6441e8b..dbfa2b7 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -153,3 +153,56 @@ The following infomask bits are applicable:
 
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
+
+Locking to write inplace-updated tables
+---------------------------------------
+
+[This is the plan, but LOCKTAG_TUPLE acquisition is not yet here.]
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives heap_inplace_update_scan() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class heap_inplace_update_scan() callers: before the call, acquire
+  LOCKTAG_RELATION in mode ShareLock (CREATE INDEX), ShareUpdateExclusiveLock
+  (VACUUM), or a mode with strictly more conflicts.  If the update targets a
+  row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX), that lock must be
+  on the table.  Locking the index rel is optional.  (This allows VACUUM to
+  overwrite per-index pg_class while holding a lock on the table alone.)  We
+  could allow weaker locks, in which case the next paragraph would simply call
+  for stronger locks for its class of commands.  heap_inplace_update_scan()
+  acquires and releases LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for
+  ExclusiveLock, on each tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock that conflicts with at least one of those from the preceding paragraph.
+  SearchSysCacheLocked1() is one convenient way to acquire LOCKTAG_TUPLE.
+  After heap_update(), release any LOCKTAG_TUPLE.  Most of these callers opt
+  to acquire just the LOCKTAG_RELATION.
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
+Reading inplace-updated columns
+-------------------------------
+
+Inplace updates create an exception to the rule that tuple data won't change
+under a reader holding a pin.  A reader of a heap_fetch() result tuple may
+witness a torn read.  Current inplace-updated fields are aligned and are no
+wider than four bytes, and current readers don't need consistency across
+fields.  Hence, they get by with just fetching each field once.  XXX such a
+caller may also read a value that has not reached WAL; see
+heap_inplace_update_finish().
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b2014..faec28a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -76,6 +76,9 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  Buffer 
newbuf, HeapTuple oldtup,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                                                                
   Bitmapset *interesting_cols,
                                                                                
   Bitmapset *external_cols,
@@ -97,6 +100,7 @@ static void compute_new_xmax_infomask(TransactionId xmax, 
uint16 old_infomask,
 static TM_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple,
                                                                                
 ItemPointer ctid, TransactionId xid,
                                                                                
 LockTupleMode mode);
+static bool inplace_xmax_lock(SysScanDesc scan);
 static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
                                                                   uint16 
*new_infomask2);
 static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax,
@@ -4072,6 +4076,45 @@ l2:
        return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+       Oid                     relid = classForm->oid;
+       Oid                     dbid;
+       LOCKTAG         tag;
+
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
+
+       if (classForm->relkind == RELKIND_INDEX)
+       {
+               Relation        irel = index_open(relid, AccessShareLock);
+
+               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+               index_close(irel, AccessShareLock);
+       }
+       else
+               SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+       if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+               elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) 
@ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(&oldtup->t_self),
+                        ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6041,34 +6084,45 @@ heap_abort_speculative(Relation relation, ItemPointer 
tid)
 }
 
 /*
- * heap_inplace_update - update a tuple "in place" (ie, overwrite it)
+ * heap_inplace_update_scan - update a row "in place" (ie, overwrite it)
  *
- * Overwriting violates both MVCC and transactional safety, so the uses
- * of this function in Postgres are extremely limited.  Nonetheless we
- * find some places to use it.
+ * Overwriting violates both MVCC and transactional safety, so the uses of
+ * this function in Postgres are extremely limited.  Nonetheless we find some
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
- * The tuple cannot change size, and therefore it's reasonable to assume
- * that its null bitmap (if any) doesn't change either.  So we just
- * overwrite the data portion of the tuple without touching the null
- * bitmap or any of the header fields.
+ * ... [any slow preparation not requiring oldtup] ...
+ * heap_inplace_update_scan([...], &tup, &inplace_state);
+ * if (!HeapTupleIsValid(tup))
+ *     elog(ERROR, [...]);
+ * ... [buffer is exclusive-locked; mutate "tup"] ...
+ * if (dirty)
+ *     heap_inplace_update_finish(inplace_state, tup);
+ * else
+ *     heap_inplace_update_cancel(inplace_state);
  *
- * tuple is an in-memory tuple structure containing the data to be written
- * over the target tuple.  Also, tuple->t_self identifies the target tuple.
+ * Since this is intended for system catalogs and SERIALIZABLE doesn't cover
+ * DDL, this skips some predicate locks.
  *
- * Note that the tuple updated here had better not come directly from the
- * syscache if the relation has a toast relation as this tuple could
- * include toast values that have been expanded, causing a failure here.
+ * The first several params duplicate the systable_beginscan() param list.
+ * "oldtupcopy" is an output parameter, assigned NULL if the key ceases to
+ * find a live tuple.  (In PROC_IN_VACUUM, that is a low-probability transient
+ * condition.)  If "oldtupcopy" gets non-NULL, you must pass output parameter
+ * "state" to heap_inplace_update_finish() or heap_inplace_update_cancel().
  */
 void
-heap_inplace_update(Relation relation, HeapTuple tuple)
+heap_inplace_update_scan(Relation relation,
+                                                Oid indexId,
+                                                bool indexOK,
+                                                Snapshot snapshot,
+                                                int nkeys, const ScanKeyData 
*key,
+                                                HeapTuple *oldtupcopy, void 
**state)
 {
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-       uint32          oldlen;
-       uint32          newlen;
+       ScanKey         mutable_key = palloc(sizeof(ScanKeyData) * nkeys);
+       int                     retries = 0;
+       SysScanDesc scan;
+       HeapTuple       oldtup;
 
        /*
         * For now, we don't allow parallel updates.  Unlike a regular update,
@@ -6081,21 +6135,70 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
-       INJECTION_POINT("inplace-before-pin");
-       buffer = ReadBuffer(relation, 
ItemPointerGetBlockNumber(&(tuple->t_self)));
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-       page = (Page) BufferGetPage(buffer);
+       /*
+        * Accept a snapshot argument, for symmetry, but this function advances
+        * its snapshot as needed to reach the tail of the updated tuple chain.
+        */
+       Assert(snapshot == NULL);
 
-       offnum = ItemPointerGetOffsetNumber(&(tuple->t_self));
-       if (PageGetMaxOffsetNumber(page) >= offnum)
-               lp = PageGetItemId(page, offnum);
+       Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
 
-       if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-               elog(ERROR, "invalid lp");
+       /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       do
+       {
+               CHECK_FOR_INTERRUPTS();
 
-       htup = (HeapTupleHeader) PageGetItem(page, lp);
+               /*
+                * Processes issuing heap_update (e.g. GRANT) at maximum speed 
could
+                * drive us to this error.  A hostile table owner has stronger 
ways to
+                * damage their own table, so that's minor.
+                */
+               if (retries++ > 10000)
+                       elog(ERROR, "giving up after too many tries to 
overwrite row");
 
-       oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+               memcpy(mutable_key, key, sizeof(ScanKeyData) * nkeys);
+               INJECTION_POINT("inplace-before-pin");
+               scan = systable_beginscan(relation, indexId, indexOK, snapshot,
+                                                                 nkeys, 
mutable_key);
+               oldtup = systable_getnext(scan);
+               if (!HeapTupleIsValid(oldtup))
+               {
+                       systable_endscan(scan);
+                       *oldtupcopy = NULL;
+                       return;
+               }
+
+#ifdef USE_ASSERT_CHECKING
+               if (RelationGetRelid(relation) == RelationRelationId)
+                       check_inplace_rel_lock(oldtup);
+#endif
+       } while (!inplace_xmax_lock(scan));
+
+       *oldtupcopy = heap_copytuple(oldtup);
+       *state = scan;
+}
+
+/*
+ * heap_inplace_update_finish - second phase of heap_inplace_update_scan()
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
+ */
+void
+heap_inplace_update_finish(void *state, HeapTuple tuple)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       HeapTupleHeader htup = oldtup->t_data;
+       Buffer          buffer = bslot->buffer;
+       Relation        relation = scan->heap_rel;
+       uint32          oldlen;
+       uint32          newlen;
+
+       Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
+       oldlen = oldtup->t_len - htup->t_hoff;
        newlen = tuple->t_len - tuple->t_data->t_hoff;
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
@@ -6107,6 +6210,19 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                   (char *) tuple->t_data + tuple->t_data->t_hoff,
                   newlen);
 
+       /*----------
+        * XXX A crash here can allow datfrozenxid() to get ahead of 
relfrozenxid:
+        *
+        * ["D" is a VACUUM (ONLY_DATABASE_STATS)]
+        * ["R" is a VACUUM tbl]
+        * D: vac_update_datfrozenid() -> systable_beginscan(pg_class)
+        * D: systable_getnext() returns pg_class tuple of tbl
+        * R: memcpy() into pg_class tuple of tbl
+        * D: raise pg_database.datfrozenxid, XLogInsert(), finish
+        * [crash]
+        * [recovery restores datfrozenxid w/o relfrozenxid]
+        */
+
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
@@ -6127,23 +6243,191 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
 
-               PageSetLSN(page, recptr);
+               PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
        END_CRIT_SECTION();
 
-       UnlockReleaseBuffer(buffer);
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       systable_endscan(scan);
 
        /*
         * Send out shared cache inval if necessary.  Note that because we only
         * pass the new version of the tuple, this mustn't be used for any
         * operations that could change catcache lookup keys.  But we aren't
         * bothering with index updates either, so that's true a fortiori.
+        *
+        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
         */
        if (!IsBootstrapProcessingMode())
                CacheInvalidateHeapTuple(relation, tuple, NULL);
 }
 
+/*
+ * heap_inplace_update_cancel - abandon a heap_inplace_update_scan()
+ *
+ * This is an alternative to making a no-op update.
+ */
+void
+heap_inplace_update_cancel(void *state)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       Buffer          buffer = bslot->buffer;
+
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       systable_endscan(scan);
+}
+
+/*
+ * inplace_xmax_lock - protect inplace update from concurrent heap_update()
+ *
+ * This operates on the last tuple that systable_getnext() returned.  Evaluate
+ * whether the tuple's state is compatible with a no-key update.  Current
+ * transaction rowmarks are fine, as is KEY SHARE from any transaction.  If
+ * compatible, return true with the buffer exclusive-locked.  Otherwise,
+ * return false after blocking transactions, if any, have ended.
+ *
+ * One could modify this to return true for tuples with delete in progress,
+ * All inplace updaters take a lock that conflicts with DROP.  If explicit
+ * "DELETE FROM pg_class" is in progress, we'll wait for it like we would an
+ * update.
+ *
+ * Readers of inplace-updated fields expect changes to those fields are
+ * durable.  For example, vac_truncate_clog() reads datfrozenxid from
+ * pg_database tuples via catalog snapshots.  A future snapshot must not
+ * return a lower datfrozenxid for the same database OID (lower in the
+ * FullTransactionIdPrecedes() sense).  We achieve that since no update of a
+ * tuple can start while we hold a lock on its buffer.  In cases like
+ * BEGIN;GRANT;CREATE INDEX;COMMIT we're inplace-updating a tuple visible only
+ * to this transaction.  ROLLBACK then is one case where it's okay to lose
+ * inplace updates.  (Restoring relhasindex=false on ROLLBACK is fine, since
+ * any concurrent CREATE INDEX would have blocked, then inplace-updated the
+ * committed tuple.)
+ *
+ * In principle, we could avoid waiting by overwriting every tuple in the
+ * updated tuple chain.  Reader expectations permit updating a tuple only if
+ * it's aborted, is the tail of the chain, or we already updated the tuple
+ * referenced in its t_ctid.  Hence, we would need to overwrite the tuples in
+ * order from tail to head.  That would tolerate either (a) mutating all
+ * tuples in one critical section or (b) accepting a chance of partial
+ * completion.  Partial completion of a relfrozenxid update would have the
+ * weird consequence that the table's next VACUUM could see the table's
+ * relfrozenxid move forward between vacuum_get_cutoffs() and finishing.
+ */
+static bool
+inplace_xmax_lock(SysScanDesc scan)
+{
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTupleData oldtup = *bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+       TM_Result       result;
+       bool            ret;
+
+       Assert(TTS_IS_BUFFERTUPLE(slot));
+       Assert(BufferIsValid(buffer));
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*----------
+        * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
+        *
+        * - wait unconditionally
+        * - no tuple locks
+        * - don't recheck header after wait: simpler to defer to next iteration
+        * - don't try to continue even if the updater aborts: likewise
+        * - no crosscheck
+        */
+       result = HeapTupleSatisfiesUpdate(&oldtup, GetCurrentCommandId(false),
+                                                                         
buffer);
+
+       if (result == TM_Invisible)
+       {
+               /* no known way this can happen */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg_internal("attempted to overwrite 
invisible tuple")));
+       }
+       else if (result == TM_SelfModified)
+       {
+               /*
+                * CREATE INDEX might reach this if an expression is silly 
enough to
+                * call e.g. SELECT ... FROM pg_class FOR SHARE.  C code of 
other SQL
+                * statements might get here after a heap_update() of the same 
row, in
+                * the absence of an intervening CommandCounterIncrement().
+                */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("tuple to be updated was already 
modified by an operation triggered by the current command")));
+       }
+       else if (result == TM_BeingModified)
+       {
+               TransactionId xwait;
+               uint16          infomask;
+               Relation        relation;
+
+               xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+               infomask = oldtup.t_data->t_infomask;
+               relation = scan->heap_rel;
+
+               if (infomask & HEAP_XMAX_IS_MULTI)
+               {
+                       LockTupleMode lockmode = LockTupleNoKeyExclusive;
+                       MultiXactStatus mxact_status = 
MultiXactStatusNoKeyUpdate;
+                       int                     remain;
+                       bool            current_is_member;
+
+                       if (DoesMultiXactIdConflict((MultiXactId) xwait, 
infomask,
+                                                                               
lockmode, &current_is_member))
+                       {
+                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                               systable_endscan(scan);
+                               ret = false;
+                               MultiXactIdWait((MultiXactId) xwait, 
mxact_status, infomask,
+                                                               relation, 
&oldtup.t_self, XLTW_Update,
+                                                               &remain);
+                       }
+                       else
+                               ret = true;
+               }
+               else if (TransactionIdIsCurrentTransactionId(xwait))
+                       ret = true;
+               else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                       ret = true;
+               else
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       systable_endscan(scan);
+                       ret = false;
+                       XactLockTableWait(xwait, relation, &oldtup.t_self,
+                                                         XLTW_Update);
+               }
+       }
+       else
+       {
+               ret = (result == TM_Ok);
+               if (!ret)
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       systable_endscan(scan);
+               }
+       }
+
+       /*
+        * GetCatalogSnapshot() relies on invalidation messages to know when to
+        * take a new snapshot.  COMMIT of xwait is responsible for sending the
+        * invalidation.  We're not acquiring heavyweight locks sufficient to
+        * block if not yet sent, so we must take a new snapshot to avoid 
spinning
+        * that ends with a "too many tries" error.  While we don't need this if
+        * xwait aborted, don't bother optimizing that.
+        */
+       if (!ret)
+               InvalidateCatalogSnapshot();
+       return ret;
+}
+
 #define                FRM_NOOP                                0x0001
 #define                FRM_INVALIDATE_XMAX             0x0002
 #define                FRM_RETURN_IS_XID               0x0004
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index a819b41..b4b68b1 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2784,7 +2784,9 @@ index_update_stats(Relation rel,
 {
        Oid                     relid = RelationGetRelid(rel);
        Relation        pg_class;
+       ScanKeyData key[1];
        HeapTuple       tuple;
+       void       *state;
        Form_pg_class rd_rel;
        bool            dirty;
 
@@ -2818,33 +2820,12 @@ index_update_stats(Relation rel,
 
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       /*
-        * Make a copy of the tuple to update.  Normally we use the syscache, 
but
-        * we can't rely on that during bootstrap or while reindexing pg_class
-        * itself.
-        */
-       if (IsBootstrapProcessingMode() ||
-               ReindexIsProcessingHeap(RelationRelationId))
-       {
-               /* don't assume syscache will work */
-               TableScanDesc pg_class_scan;
-               ScanKeyData key[1];
-
-               ScanKeyInit(&key[0],
-                                       Anum_pg_class_oid,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(relid));
-
-               pg_class_scan = table_beginscan_catalog(pg_class, 1, key);
-               tuple = heap_getnext(pg_class_scan, ForwardScanDirection);
-               tuple = heap_copytuple(tuple);
-               table_endscan(pg_class_scan);
-       }
-       else
-       {
-               /* normal case, use syscache */
-               tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
-       }
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       heap_inplace_update_scan(pg_class, ClassOidIndexId, true, NULL, 1, key,
+                                                        &tuple, &state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u", relid);
@@ -2907,11 +2888,12 @@ index_update_stats(Relation rel,
         */
        if (dirty)
        {
-               heap_inplace_update(pg_class, tuple);
+               heap_inplace_update_finish(state, tuple);
                /* the above sends a cache inval message */
        }
        else
        {
+               heap_inplace_update_cancel(state);
                /* no need to change tuple, but force relcache inval anyway */
                CacheInvalidateRelcacheByTuple(tuple);
        }
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 738bc46..c882f3c 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -29,6 +29,7 @@
 #include "catalog/toasting.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -333,21 +334,36 @@ create_toast_table(Relation rel, Oid toastOid, Oid 
toastIndexOid,
         */
        class_rel = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
-       if (!HeapTupleIsValid(reltup))
-               elog(ERROR, "cache lookup failed for relation %u", relOid);
-
-       ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
-
        if (!IsBootstrapProcessingMode())
        {
                /* normal case, use a transactional update */
+               reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
                CatalogTupleUpdate(class_rel, &reltup->t_self, reltup);
        }
        else
        {
                /* While bootstrapping, we cannot UPDATE, so overwrite in-place 
*/
-               heap_inplace_update(class_rel, reltup);
+
+               ScanKeyData key[1];
+               void       *state;
+
+               ScanKeyInit(&key[0],
+                                       Anum_pg_class_oid,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(relOid));
+               heap_inplace_update_scan(class_rel, ClassOidIndexId, true,
+                                                                NULL, 1, key, 
&reltup, &state);
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
+               heap_inplace_update_finish(state, reltup);
        }
 
        heap_freetuple(reltup);
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index be629ea..da4d2b7 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1637,6 +1637,8 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        bool            db_istemplate;
        Relation        pgdbrel;
        HeapTuple       tup;
+       ScanKeyData key[1];
+       void       *inplace_state;
        Form_pg_database datform;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1774,11 +1776,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         */
        pgstat_drop_database(db_id);
 
-       tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
-       if (!HeapTupleIsValid(tup))
-               elog(ERROR, "cache lookup failed for database %u", db_id);
-       datform = (Form_pg_database) GETSTRUCT(tup);
-
        /*
         * Except for the deletion of the catalog row, subsequent actions are 
not
         * transactional (consider DropDatabaseBuffers() discarding modified
@@ -1790,8 +1787,17 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * modification is durable before performing irreversible filesystem
         * operations.
         */
+       ScanKeyInit(&key[0],
+                               Anum_pg_database_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(db_id));
+       heap_inplace_update_scan(pgdbrel, DatabaseOidIndexId, true,
+                                                        NULL, 1, key, &tup, 
&inplace_state);
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
+       datform = (Form_pg_database) GETSTRUCT(tup);
        datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
-       heap_inplace_update(pgdbrel, tup);
+       heap_inplace_update_finish(inplace_state, tup);
        XLogFlush(XactLastRecEnd);
 
        /*
@@ -1799,6 +1805,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * the row will be gone, but if we fail, dropdb() can be invoked again.
         */
        CatalogTupleDelete(pgdbrel, &tup->t_self);
+       heap_freetuple(tup);
 
        /*
         * Drop db-specific replication slots.
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 7a5ed6b..22d0ce7 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -946,25 +946,18 @@ EventTriggerOnLogin(void)
                {
                        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
                        HeapTuple       tuple;
+                       void       *state;
                        Form_pg_database db;
                        ScanKeyData key[1];
-                       SysScanDesc scan;
 
-                       /*
-                        * Get the pg_database tuple to scribble on.  Note that 
this does
-                        * not directly rely on the syscache to avoid issues 
with
-                        * flattened toast values for the in-place update.
-                        */
+                       /* Fetch a copy of the tuple to scribble on */
                        ScanKeyInit(&key[0],
                                                Anum_pg_database_oid,
                                                BTEqualStrategyNumber, F_OIDEQ,
                                                ObjectIdGetDatum(MyDatabaseId));
 
-                       scan = systable_beginscan(pg_db, DatabaseOidIndexId, 
true,
-                                                                         NULL, 
1, key);
-                       tuple = systable_getnext(scan);
-                       tuple = heap_copytuple(tuple);
-                       systable_endscan(scan);
+                       heap_inplace_update_scan(pg_db, DatabaseOidIndexId, 
true,
+                                                                        NULL, 
1, key, &tuple, &state);
 
                        if (!HeapTupleIsValid(tuple))
                                elog(ERROR, "could not find tuple for database 
%u", MyDatabaseId);
@@ -980,13 +973,15 @@ EventTriggerOnLogin(void)
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
                                 *
-                                * It's known that changes made by 
heap_inplace_update() may
-                                * be lost due to concurrent normal updates.  
However, we are
-                                * OK with that.  The subsequent connections 
will still have a
-                                * chance to set "dathasloginevt" to false.
+                                * Changes made by inplace update may be lost 
due to
+                                * concurrent normal updates; see 
inplace-inval.spec. However,
+                                * we are OK with that.  The subsequent 
connections will still
+                                * have a chance to set "dathasloginevt" to 
false.
                                 */
-                               heap_inplace_update(pg_db, tuple);
+                               heap_inplace_update_finish(state, tuple);
                        }
+                       else
+                               heap_inplace_update_cancel(state);
                        table_close(pg_db, RowExclusiveLock);
                        heap_freetuple(tuple);
                }
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 48f8eab..d299a25 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1405,7 +1405,9 @@ vac_update_relstats(Relation relation,
 {
        Oid                     relid = RelationGetRelid(relation);
        Relation        rd;
+       ScanKeyData key[1];
        HeapTuple       ctup;
+       void       *inplace_state;
        Form_pg_class pgcform;
        bool            dirty,
                                futurexid,
@@ -1416,7 +1418,12 @@ vac_update_relstats(Relation relation,
        rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
-       ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       heap_inplace_update_scan(rd, ClassOidIndexId, true,
+                                                        NULL, 1, key, &ctup, 
&inplace_state);
        if (!HeapTupleIsValid(ctup))
                elog(ERROR, "pg_class entry for relid %u vanished during 
vacuuming",
                         relid);
@@ -1524,7 +1531,9 @@ vac_update_relstats(Relation relation,
 
        /* If anything changed, write out the tuple. */
        if (dirty)
-               heap_inplace_update(rd, ctup);
+               heap_inplace_update_finish(inplace_state, ctup);
+       else
+               heap_inplace_update_cancel(inplace_state);
 
        table_close(rd, RowExclusiveLock);
 
@@ -1576,6 +1585,7 @@ vac_update_datfrozenxid(void)
        bool            bogus = false;
        bool            dirty = false;
        ScanKeyData key[1];
+       void       *inplace_state;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1699,20 +1709,18 @@ vac_update_datfrozenxid(void)
        relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
+        * Fetch a copy of the tuple to scribble on.  We could check the 
syscache
+        * tuple first.  If that concluded !dirty, we'd avoid waiting on
+        * concurrent heap_update() and would avoid exclusive-locking the 
buffer.
+        * For now, don't optimize that.
         */
        ScanKeyInit(&key[0],
                                Anum_pg_database_oid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(MyDatabaseId));
 
-       scan = systable_beginscan(relation, DatabaseOidIndexId, true,
-                                                         NULL, 1, key);
-       tuple = systable_getnext(scan);
-       tuple = heap_copytuple(tuple);
-       systable_endscan(scan);
+       heap_inplace_update_scan(relation, DatabaseOidIndexId, true,
+                                                        NULL, 1, key, &tuple, 
&inplace_state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for database %u", 
MyDatabaseId);
@@ -1746,7 +1754,9 @@ vac_update_datfrozenxid(void)
                newMinMulti = dbform->datminmxid;
 
        if (dirty)
-               heap_inplace_update(relation, tuple);
+               heap_inplace_update_finish(inplace_state, tuple);
+       else
+               heap_inplace_update_cancel(inplace_state);
 
        heap_freetuple(tuple);
        table_close(relation, RowExclusiveLock);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec8..2e13fb9 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -336,7 +336,14 @@ extern TM_Result heap_lock_tuple(Relation relation, 
HeapTuple tuple,
                                                                 bool 
follow_updates,
                                                                 Buffer 
*buffer, struct TM_FailureData *tmfd);
 
-extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern void heap_inplace_update_scan(Relation relation,
+                                                                        Oid 
indexId,
+                                                                        bool 
indexOK,
+                                                                        
Snapshot snapshot,
+                                                                        int 
nkeys, const ScanKeyData *key,
+                                                                        
HeapTuple *oldtupcopy, void **state);
+extern void heap_inplace_update_finish(void *state, HeapTuple tuple);
+extern void heap_inplace_update_cancel(void *state);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
                                                                          const 
struct VacuumCutoffs *cutoffs,
                                                                          
HeapPageFreeze *pagefrz,
diff --git a/src/test/isolation/expected/intra-grant-inplace-db.out 
b/src/test/isolation/expected/intra-grant-inplace-db.out
index 432ece5..a91402c 100644
--- a/src/test/isolation/expected/intra-grant-inplace-db.out
+++ b/src/test/isolation/expected/intra-grant-inplace-db.out
@@ -9,20 +9,20 @@ step b1: BEGIN;
 step grant1: 
        GRANT TEMP ON DATABASE isolation_regression TO regress_temp_grantee;
 
-step vac2: VACUUM (FREEZE);
+step vac2: VACUUM (FREEZE); <waiting ...>
 step snap3: 
        INSERT INTO frozen_witness
        SELECT datfrozenxid FROM pg_database WHERE datname = current_catalog;
 
 step c1: COMMIT;
+step vac2: <... completed>
 step cmp3: 
        SELECT 'datfrozenxid retreated'
        FROM pg_database
        WHERE datname = current_catalog
                AND age(datfrozenxid) > (SELECT min(age(x)) FROM 
frozen_witness);
 
-?column?              
-----------------------
-datfrozenxid retreated
-(1 row)
+?column?
+--------
+(0 rows)
 
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index cc1e47a..c2a9841 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -14,15 +14,16 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
@@ -58,8 +59,9 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
+step addk2: <... completed>
 
 starting permutation: b2 sfnku2 addk2 c2
 step b2: BEGIN;
@@ -122,17 +124,18 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
diff --git a/src/test/isolation/specs/intra-grant-inplace-db.spec 
b/src/test/isolation/specs/intra-grant-inplace-db.spec
index bbecd5d..9de40ec 100644
--- a/src/test/isolation/specs/intra-grant-inplace-db.spec
+++ b/src/test/isolation/specs/intra-grant-inplace-db.spec
@@ -42,5 +42,4 @@ step cmp3     {
 }
 
 
-# XXX extant bug
 permutation snap3 b1 grant1 vac2(c1) snap3 c1 cmp3
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index 3cd696b..eed0b52 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -73,7 +73,7 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned post-bugfix behavior
+# XXX extant bugs: permutation comments refer to planned future LockTuple()
 
 permutation
        b1
diff --git a/src/test/modules/injection_points/expected/inplace.out 
b/src/test/modules/injection_points/expected/inplace.out
index 123f45a..db7dab6 100644
--- a/src/test/modules/injection_points/expected/inplace.out
+++ b/src/test/modules/injection_points/expected/inplace.out
@@ -40,4 +40,301 @@ step read1:
        SELECT reltuples = -1 AS reltuples_unknown
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 
-ERROR:  could not create unique index "pg_class_oid_index"
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 grant2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: vac1 begin2 grant2 revoke2 mkrels3 c2 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step c2: COMMIT;
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 grant2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/inplace.spec 
b/src/test/modules/injection_points/specs/inplace.spec
index e957713..86539a5 100644
--- a/src/test/modules/injection_points/specs/inplace.spec
+++ b/src/test/modules/injection_points/specs/inplace.spec
@@ -32,12 +32,9 @@ setup
        CREATE TABLE vactest.orig50 ();
        SELECT vactest.mkrels('orig', 51, 100);
 }
-
-# XXX DROP causes an assertion failure; adopt DROP once fixed
 teardown
 {
-       --DROP SCHEMA vactest CASCADE;
-       DO $$BEGIN EXECUTE 'ALTER SCHEMA vactest RENAME TO schema' || oid FROM 
pg_namespace where nspname = 'vactest'; END$$;
+       DROP SCHEMA vactest CASCADE;
        DROP EXTENSION injection_points;
 }
 
@@ -56,11 +53,13 @@ step read1  {
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 }
 
-
 # Transactional updates of the tuple vac1 is waiting to inplace-update.
 session s2
 step grant2            { GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC; }
-
+step revoke2   { REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC; }
+step begin2            { BEGIN; }
+step c2                        { COMMIT; }
+step r2                        { ROLLBACK; }
 
 # Non-blocking actions.
 session s3
@@ -74,10 +73,69 @@ step mkrels3        {
 }
 
 
-# XXX extant bug
+# target gains a successor at the last moment
 permutation
        vac1(mkrels3)   # reads pg_class tuple T0 for vactest.orig50, xmax 
invalid
        grant2                  # T0 becomes eligible for pruning, T1 is 
successor
        vac3                    # T0 becomes LP_UNUSED
-       mkrels3                 # T0 reused; vac1 wakes and overwrites the 
reused T0
+       mkrels3                 # vac1 wakes, scans to T1
        read1
+
+# target already has a successor, which commits
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       c2                              # T0 becomes eligible for pruning
+       vac3                    # T0 becomes LP_UNUSED
+       mkrels3                 # vac1 wakes, scans to T1
+       read1
+
+# target already has a successor, which becomes LP_UNUSED at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       r2                              # T1 becomes eligible for pruning
+       vac3                    # T1 becomes LP_UNUSED
+       mkrels3                 # reuse T1; vac1 scans to T0
+       read1
+
+# target already has a successor, which becomes LP_REDIRECT at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       c2
+       revoke2                 # HOT update to T2
+       grant2                  # HOT update to T3
+       vac3                    # T1 becomes LP_REDIRECT
+       mkrels3                 # reuse T2; vac1 scans to T3
+       read1
+
+# waiting for updater to end
+permutation
+       vac1(c2)                # reads pg_class tuple T0 for vactest.orig50, 
xmax invalid
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       revoke2                 # HOT update to T2
+       mkrels3                 # vac1 awakes briefly, then waits for s2
+       c2
+       read1
+
+# Another LP_UNUSED.  This time, do change the live tuple.  Final live tuple
+# body is identical to original, at a different TID.
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       r2                              # T1 becomes eligible for pruning
+       grant2                  # T0.t_ctid = T2; T0 becomes eligible for 
pruning
+       revoke2                 # T2.t_ctid = T3; T2 becomes eligible for 
pruning
+       vac3                    # T0, T1 & T2 become LP_UNUSED
+       mkrels3                 # reuse T0, T1 & T2; vac1 scans to T3
+       read1
+
+# Another LP_REDIRECT.  Compared to the earlier test, omit the last grant2.
+# Hence, final live tuple body is identical to original, at a different TID.
+permutation begin2 grant2 vac1(mkrels3) c2 revoke2 vac3 mkrels3 read1
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Make heap_update() callers wait for inplace update.
    
    The previous commit fixed some ways of losing an inplace update.  It
    remained possible to lose one when a backend working toward a
    heap_update() copied a tuple into memory just before inplace update of
    that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
    to govern admission to the steps of copying an old tuple, modifying it,
    and issuing heap_update().  This includes UPDATE and MERGE commands.  To
    avoid changing most of the pg_class DDL, don't require LOCKTAG_TUPLE
    when holding a relation lock sufficient to exclude inplace updaters.
    Back-patch to v12 (all supported versions).
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20231027214946.79.nmi...@google.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index dbfa2b7..fb06ff2 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -157,8 +157,6 @@ is set.
 Locking to write inplace-updated tables
 ---------------------------------------
 
-[This is the plan, but LOCKTAG_TUPLE acquisition is not yet here.]
-
 If IsInplaceUpdateRelation() returns true for a table, the table is a system
 catalog that receives heap_inplace_update_scan() calls.  Preparing a
 heap_update() of these tables follows additional locking rules, to ensure we
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index faec28a..051aa10 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,8 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -77,6 +79,9 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
 #ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                               
                 ItemPointer otid,
+                                                                               
                 HeapTuple newtup);
 static void check_inplace_rel_lock(HeapTuple oldtup);
 #endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
@@ -126,6 +131,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, 
HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3212,6 +3219,10 @@ heap_update(Relation relation, ItemPointer otid, 
HeapTuple newtup,
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
+#ifdef USE_ASSERT_CHECKING
+       check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
        /*
         * Fetch the list of attributes to be checked for various operations.
         *
@@ -4078,6 +4089,89 @@ l2:
 
 #ifdef USE_ASSERT_CHECKING
 /*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                        
ItemPointer otid,
+                                                                        
HeapTuple newtup)
+{
+       /* LOCKTAG_TUPLE acceptable for any catalog */
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+               case DatabaseRelationId:
+                       {
+                               LOCKTAG         tuptag;
+
+                               SET_LOCKTAG_TUPLE(tuptag,
+                                                                 
relation->rd_lockInfo.lockRelId.dbId,
+                                                                 
relation->rd_lockInfo.lockRelId.relId,
+                                                                 
ItemPointerGetBlockNumber(otid),
+                                                                 
ItemPointerGetOffsetNumber(otid));
+                               if (LockHeldByMe(&tuptag, 
InplaceUpdateTupleLock, false))
+                                       return;
+                       }
+                       break;
+               default:
+                       Assert(!IsInplaceUpdateRelation(relation));
+                       return;
+       }
+
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+                       {
+                               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+                               Form_pg_class classForm = (Form_pg_class) 
GETSTRUCT(newtup);
+                               Oid                     relid = classForm->oid;
+                               Oid                     dbid;
+                               LOCKTAG         tag;
+
+                               if (IsSharedRelation(relid))
+                                       dbid = InvalidOid;
+                               else
+                                       dbid = MyDatabaseId;
+
+                               if (classForm->relkind == RELKIND_INDEX)
+                               {
+                                       Relation        irel = 
index_open(relid, AccessShareLock);
+
+                                       SET_LOCKTAG_RELATION(tag, dbid, 
irel->rd_index->indrelid);
+                                       index_close(irel, AccessShareLock);
+                               }
+                               else
+                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+                               if (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
+                                       !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true))
+                                       elog(WARNING,
+                                                "missing lock for relation 
\"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                                                NameStr(classForm->relname),
+                                                relid,
+                                                classForm->relkind,
+                                                
ItemPointerGetBlockNumber(otid),
+                                                
ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+               case DatabaseRelationId:
+                       {
+                               /* LOCKTAG_TUPLE required */
+                               Form_pg_database dbForm = (Form_pg_database) 
GETSTRUCT(newtup);
+
+                               elog(WARNING,
+                                        "missing lock on database \"%s\" (OID 
%u) @ TID (%u,%u)",
+                                        NameStr(dbForm->datname),
+                                        dbForm->oid,
+                                        ItemPointerGetBlockNumber(otid),
+                                        ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+       }
+}
+
+/*
  * Confirm adequate relation lock held, per rules from README.tuplock section
  * "Locking to write inplace-updated tables".
  */
@@ -6123,6 +6217,7 @@ heap_inplace_update_scan(Relation relation,
        int                     retries = 0;
        SysScanDesc scan;
        HeapTuple       oldtup;
+       ItemPointerData locked;
 
        /*
         * For now, we don't allow parallel updates.  Unlike a regular update,
@@ -6144,6 +6239,7 @@ heap_inplace_update_scan(Relation relation,
        Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
 
        /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       ItemPointerSetInvalid(&locked);
        do
        {
                CHECK_FOR_INTERRUPTS();
@@ -6163,6 +6259,8 @@ heap_inplace_update_scan(Relation relation,
                oldtup = systable_getnext(scan);
                if (!HeapTupleIsValid(oldtup))
                {
+                       if (ItemPointerIsValid(&locked))
+                               UnlockTuple(relation, &locked, 
InplaceUpdateTupleLock);
                        systable_endscan(scan);
                        *oldtupcopy = NULL;
                        return;
@@ -6172,6 +6270,15 @@ heap_inplace_update_scan(Relation relation,
                if (RelationGetRelid(relation) == RelationRelationId)
                        check_inplace_rel_lock(oldtup);
 #endif
+
+               if (!(ItemPointerIsValid(&locked) &&
+                         ItemPointerEquals(&locked, &oldtup->t_self)))
+               {
+                       if (ItemPointerIsValid(&locked))
+                               UnlockTuple(relation, &locked, 
InplaceUpdateTupleLock);
+                       LockTuple(relation, &oldtup->t_self, 
InplaceUpdateTupleLock);
+               }
+               locked = oldtup->t_self;
        } while (!inplace_xmax_lock(scan));
 
        *oldtupcopy = heap_copytuple(oldtup);
@@ -6183,6 +6290,8 @@ heap_inplace_update_scan(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_finish(void *state, HeapTuple tuple)
@@ -6249,6 +6358,7 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
        END_CRIT_SECTION();
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
        systable_endscan(scan);
 
        /*
@@ -6274,9 +6384,12 @@ heap_inplace_update_cancel(void *state)
        SysScanDesc scan = (SysScanDesc) state;
        TupleTableSlot *slot = scan->slot;
        BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
        Buffer          buffer = bslot->buffer;
+       Relation        relation = scan->heap_rel;
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
        systable_endscan(scan);
 }
 
@@ -6335,7 +6448,7 @@ inplace_xmax_lock(SysScanDesc scan)
         * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
         *
         * - wait unconditionally
-        * - no tuple locks
+        * - caller handles tuple lock, since inplace needs it unconditionally
         * - don't recheck header after wait: simpler to defer to next iteration
         * - don't try to continue even if the updater aborts: likewise
         * - no crosscheck
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index a44ccee..bc0e259 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                HeapTuple       tuple;
                ListCell   *cell_colprivs;
 
-               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+               tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", 
relOid);
                pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                                                                
 values, nulls, replaces);
 
                        CatalogTupleUpdate(relation, &newtuple->t_self, 
newtuple);
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                        /* Update initial privileges for extensions */
                        recordExtensionInitPriv(relOid, RelationRelationId, 0, 
new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
                        pfree(new_acl);
                }
+               else
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                /*
                 * Handle column-level privileges, if any were specified or 
implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                Oid                *oldmembers;
                Oid                *newmembers;
 
-               tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+               tuple = SearchSysCacheLocked1(cacheid, 
ObjectIdGetDatum(objectid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for %s %u", 
get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                                                                         nulls, 
replaces);
 
                CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+               UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /* Update initial privileges for extensions */
                recordExtensionInitPriv(objectid, classid, 0, new_acl);
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 6c39434..8aefbcd 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *             True iff core code performs inplace updates on the relation.
+ *
+ *             This is used for assertions and for making the executor follow 
the
+ *             locking protocol described at README.tuplock section "Locking 
to write
+ *             inplace-updated tables".  Extensions may inplace-update other 
heap
+ *             tables, but concurrent SQL UPDATE on the same table may 
overwrite
+ *             those modifications.
+ *
+ *             The executor can assume these are not partitions or partitioned 
and
+ *             have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index da4d2b7..fd48022 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1864,6 +1864,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
        Oid                     db_id;
        HeapTuple       newtup;
+       ItemPointerData otid;
        Relation        rel;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1935,11 +1936,13 @@ RenameDatabase(const char *oldname, const char *newname)
                                 errdetail_busy_db(notherbackends, 
npreparedxacts)));
 
        /* rename */
-       newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       newtup = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(db_id));
        if (!HeapTupleIsValid(newtup))
                elog(ERROR, "cache lookup failed for database %u", db_id);
+       otid = newtup->t_self;
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+       CatalogTupleUpdate(rel, &otid, newtup);
+       UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2188,6 +2191,7 @@ movedb(const char *dbname, const char *tblspcname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not 
exist", dbname)));
+               LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                new_record[Anum_pg_database_dattablespace - 1] = 
ObjectIdGetDatum(dst_tblspcoid);
                new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2196,6 +2200,7 @@ movedb(const char *dbname, const char *tblspcname)
                                                                         
new_record,
                                                                         
new_record_nulls, new_record_repl);
                CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+               UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2426,6 +2431,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", 
stmt->dbname)));
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
@@ -2475,6 +2481,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                                                 
new_record_nulls, new_record_repl);
        CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2524,6 +2531,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
        if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datum = heap_getattr(tuple, Anum_pg_database_datcollversion, 
RelationGetDescr(rel), &isnull);
        oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2552,6 +2560,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                bool            nulls[Natts_pg_database] = {0};
                bool            replaces[Natts_pg_database] = {0};
                Datum           values[Natts_pg_database] = {0};
+               HeapTuple       newtuple;
 
                ereport(NOTICE,
                                (errmsg("changing version from %s to %s",
@@ -2560,14 +2569,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                values[Anum_pg_database_datcollversion - 1] = 
CStringGetTextDatum(newversion);
                replaces[Anum_pg_database_datcollversion - 1] = true;
 
-               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                                                 values, 
nulls, replaces);
-               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-               heap_freetuple(tuple);
+               newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                        
values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+               heap_freetuple(newtuple);
        }
        else
                ereport(NOTICE,
                                (errmsg("version has not changed")));
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2679,6 +2689,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                                        
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to change 
owner of database")));
 
+               LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
                repl_repl[Anum_pg_database_datdba - 1] = true;
                repl_val[Anum_pg_database_datdba - 1] = 
ObjectIdGetDatum(newOwnerId);
 
@@ -2700,6 +2712,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
                newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), 
repl_val, repl_null, repl_repl);
                CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+               UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 22d0ce7..36d82bd 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
        /* Set dathasloginevt flag in pg_database */
        Form_pg_database db;
        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
+       ItemPointerData otid;
        HeapTuple       tuple;
 
        /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
         */
        LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, 
AccessExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
+       tuple = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for database %u", 
MyDatabaseId);
+       otid = tuple->t_self;
        db = (Form_pg_database) GETSTRUCT(tuple);
        if (!db->dathasloginevt)
        {
                db->dathasloginevt = true;
-               CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_db, &otid, tuple);
                CommandCounterIncrement();
        }
+       UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
        table_close(pg_db, RowExclusiveLock);
        heap_freetuple(tuple);
 }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 2caab88..8d04ca0 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -4409,14 +4409,17 @@ update_relispartition(Oid relationId, bool newval)
 {
        HeapTuple       tup;
        Relation        classRel;
+       ItemPointerData otid;
 
        classRel = table_open(RelationRelationId, RowExclusiveLock);
-       tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+       tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relationId);
+       otid = tup->t_self;
        Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
        ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-       CatalogTupleUpdate(classRel, &tup->t_self, tup);
+       CatalogTupleUpdate(classRel, &otid, tup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tup);
        table_close(classRel, RowExclusiveLock);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 8fcb188..7fa80a5 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3609,6 +3609,7 @@ SetRelationTableSpace(Relation rel,
 {
        Relation        pg_class;
        HeapTuple       tuple;
+       ItemPointerData otid;
        Form_pg_class rd_rel;
        Oid                     reloid = RelationGetRelid(rel);
 
@@ -3617,9 +3618,10 @@ SetRelationTableSpace(Relation rel,
        /* Get a modifiable copy of the relation's pg_class row. */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+       tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", reloid);
+       otid = tuple->t_self;
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
        /* Update the pg_class row. */
@@ -3627,7 +3629,8 @@ SetRelationTableSpace(Relation rel,
                InvalidOid : newTableSpaceId;
        if (RelFileNumberIsValid(newRelFilenumber))
                rd_rel->relfilenode = newRelFilenumber;
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
        /*
         * Record dependency on tablespace.  This is only required for relations
@@ -4121,6 +4124,7 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
 {
        Relation        targetrelation;
        Relation        relrelation;    /* for RELATION relation */
+       ItemPointerData otid;
        HeapTuple       reltup;
        Form_pg_class relform;
        Oid                     namespaceId;
@@ -4143,7 +4147,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       otid = reltup->t_self;
        if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
        relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4170,7 +4175,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        namestrcpy(&(relform->relname), newrelname);
 
-       CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+       CatalogTupleUpdate(relrelation, &otid, reltup);
+       UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                                                 InvalidOid, 
is_internal);
@@ -14917,7 +14923,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -15021,6 +15027,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                                                                 repl_val, 
repl_null, repl_repl);
 
        CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+       UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17170,7 +17177,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        ObjectAddress thisobj;
        bool            already_done = false;
 
-       classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+       /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+       classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(classTup))
                elog(ERROR, "cache lookup failed for relation %u", relOid);
        classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17189,6 +17197,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        already_done = object_address_present(&thisobj, objsMoved);
        if (!already_done && oldNspOid != newNspOid)
        {
+               ItemPointerData otid = classTup->t_self;
+
                /* check for duplicate name (more friendly than unique-index 
failure) */
                if (get_relname_relid(NameStr(classForm->relname),
                                                          newNspOid) != 
InvalidOid)
@@ -17201,7 +17211,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                /* classTup is a copy, so OK to scribble on */
                classForm->relnamespace = newNspOid;
 
-               CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+               CatalogTupleUpdate(classRel, &otid, classTup);
+               UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
                /* Update dependency on schema if caller said so */
                if (hasDependEntry &&
@@ -17213,6 +17225,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                        elog(ERROR, "could not change schema dependency for 
relation \"%s\"",
                                 NameStr(classForm->relname));
        }
+       else
+               UnlockTuple(classRel, &classTup->t_self, 
InplaceUpdateTupleLock);
        if (!already_done)
        {
                add_exact_object_address(&thisobj, objsMoved);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4d7c92d..321ad47 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1209,6 +1209,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_NumIndices = 0;
        resultRelInfo->ri_IndexRelationDescs = NULL;
        resultRelInfo->ri_IndexRelationInfo = NULL;
+       resultRelInfo->ri_needLockTagTuple =
+               IsInplaceUpdateRelation(resultRelationDesc);
        /* make a copy so as not to depend on relcache info not changing... */
        resultRelInfo->ri_TrigDesc = 
CopyTriggerDesc(resultRelationDesc->trigdesc);
        if (resultRelInfo->ri_TrigDesc)
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index d0a89cd..f18efdb 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -559,8 +559,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        Relation        rel = resultRelInfo->ri_RelationDesc;
        ItemPointer tid = &(searchslot->tts_tid);
 
-       /* For now we support only tables. */
+       /*
+        * We support only non-system tables, with
+        * check_publication_add_relation() accountable.
+        */
        Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+       Assert(!IsCatalogRelation(rel));
 
        CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
diff --git a/src/backend/executor/nodeModifyTable.c 
b/src/backend/executor/nodeModifyTable.c
index a2442b7..b70d2f6 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2320,6 +2320,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
        }
        else
        {
+               ItemPointerData lockedtid;
+
                /*
                 * If we generate a new candidate tuple after EvalPlanQual 
testing, we
                 * must loop back here to try again.  (We don't need to redo 
triggers,
@@ -2328,6 +2330,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
                 * to do them again.)
                 */
 redo_act:
+               lockedtid = *tupleid;
                result = ExecUpdateAct(context, resultRelInfo, tupleid, 
oldtuple, slot,
                                                           canSetTag, 
&updateCxt);
 
@@ -2421,6 +2424,14 @@ redo_act:
                                                                
ExecInitUpdateProjection(context->mtstate,
                                                                                
                                 resultRelInfo);
 
+                                                       if 
(resultRelInfo->ri_needLockTagTuple)
+                                                       {
+                                                               
UnlockTuple(resultRelationDesc,
+                                                                               
        &lockedtid, InplaceUpdateTupleLock);
+                                                               
LockTuple(resultRelationDesc,
+                                                                               
  tupleid, InplaceUpdateTupleLock);
+                                                       }
+
                                                        /* Fetch the most 
recent version of old tuple. */
                                                        oldSlot = 
resultRelInfo->ri_oldTupleSlot;
                                                        if 
(!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2525,6 +2536,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
        TransactionId xmin;
        bool            isnull;
 
+       /*
+        * Parse analysis should have blocked ON CONFLICT for all system
+        * relations, which includes these.  There's no fundamental obstacle to
+        * supporting this; we'd just need to handle LOCKTAG_TUPLE like the 
other
+        * ExecUpdate() caller.
+        */
+       Assert(!resultRelInfo->ri_needLockTagTuple);
+
        /* Determine lock mode to use */
        lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2850,6 +2869,7 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
 {
        ModifyTableState *mtstate = context->mtstate;
        List      **mergeActions = resultRelInfo->ri_MergeActions;
+       ItemPointerData lockedtid;
        List       *actionStates;
        TupleTableSlot *newslot = NULL;
        TupleTableSlot *rslot = NULL;
@@ -2886,17 +2906,33 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
         * target wholerow junk attr.
         */
        Assert(tupleid != NULL || oldtuple != NULL);
+       ItemPointerSetInvalid(&lockedtid);
        if (oldtuple != NULL)
        {
                Assert(resultRelInfo->ri_TrigDesc);
+               Assert(!resultRelInfo->ri_needLockTagTuple);
                ExecForceStoreHeapTuple(oldtuple, 
resultRelInfo->ri_oldTupleSlot,
                                                                false);
        }
-       else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                                                               
        tupleid,
-                                                                               
        SnapshotAny,
-                                                                               
        resultRelInfo->ri_oldTupleSlot))
-               elog(ERROR, "failed to fetch the target tuple");
+       else
+       {
+               if (resultRelInfo->ri_needLockTagTuple)
+               {
+                       /*
+                        * This locks even tuples that don't match 
mas_whenqual, which
+                        * isn't ideal.  MERGE on system catalogs is a minor 
use case, so
+                        * don't bother doing better.
+                        */
+                       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                         InplaceUpdateTupleLock);
+                       lockedtid = *tupleid;
+               }
+               if 
(!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                                                               
   tupleid,
+                                                                               
   SnapshotAny,
+                                                                               
   resultRelInfo->ri_oldTupleSlot))
+                       elog(ERROR, "failed to fetch the target tuple");
+       }
 
        /*
         * Test the join condition.  If it's satisfied, perform a MATCHED 
action.
@@ -2968,7 +3004,7 @@ lmerge_matched:
                                                                                
tupleid, NULL, newslot, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -2979,7 +3015,7 @@ lmerge_matched:
                                {
                                        if (!ExecIRUpdateTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple, newslot))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
@@ -2999,7 +3035,8 @@ lmerge_matched:
                                        if (updateCxt.crossPartUpdate)
                                        {
                                                mtstate->mt_merge_updated += 1;
-                                               return 
context->cpUpdateReturningSlot;
+                                               rslot = 
context->cpUpdateReturningSlot;
+                                               goto out;
                                        }
                                }
 
@@ -3017,7 +3054,7 @@ lmerge_matched:
                                                                                
NULL, NULL, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -3028,7 +3065,7 @@ lmerge_matched:
                                {
                                        if (!ExecIRDeleteTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                        result = ExecDeleteAct(context, 
resultRelInfo, tupleid,
@@ -3109,7 +3146,7 @@ lmerge_matched:
                                 * let caller handle it under NOT MATCHED [BY 
TARGET] clauses.
                                 */
                                *matched = false;
-                               return NULL;
+                               goto out;
 
                        case TM_Updated:
                                {
@@ -3183,7 +3220,7 @@ lmerge_matched:
                                                                 * more to do.
                                                                 */
                                                                if 
(TupIsNull(epqslot))
-                                                                       return 
NULL;
+                                                                       goto 
out;
 
                                                                /*
                                                                 * If we got a 
NULL ctid from the subplan, the
@@ -3201,6 +3238,15 @@ lmerge_matched:
                                                                 * we need to 
switch to the NOT MATCHED BY
                                                                 * SOURCE case.
                                                                 */
+                                                               if 
(resultRelInfo->ri_needLockTagTuple)
+                                                               {
+                                                                       if 
(ItemPointerIsValid(&lockedtid))
+                                                                               
UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                                               
                        InplaceUpdateTupleLock);
+                                                                       
LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                                                               
          InplaceUpdateTupleLock);
+                                                                       
lockedtid = context->tmfd.ctid;
+                                                               }
                                                                if 
(!table_tuple_fetch_row_version(resultRelationDesc,
                                                                                
                                                   &context->tmfd.ctid,
                                                                                
                                                   SnapshotAny,
@@ -3229,7 +3275,7 @@ lmerge_matched:
                                                         * MATCHED [BY TARGET] 
actions
                                                         */
                                                        *matched = false;
-                                                       return NULL;
+                                                       goto out;
 
                                                case TM_SelfModified:
 
@@ -3257,13 +3303,13 @@ lmerge_matched:
 
                                                        /* This shouldn't 
happen */
                                                        elog(ERROR, "attempted 
to update or delete invisible tuple");
-                                                       return NULL;
+                                                       goto out;
 
                                                default:
                                                        /* see table_tuple_lock 
call in ExecDelete() */
                                                        elog(ERROR, "unexpected 
table_tuple_lock status: %u",
                                                                 result);
-                                                       return NULL;
+                                                       goto out;
                                        }
                                }
 
@@ -3310,6 +3356,10 @@ lmerge_matched:
        /*
         * Successfully executed an action or no qualifying action was found.
         */
+out:
+       if (ItemPointerIsValid(&lockedtid))
+               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                       InplaceUpdateTupleLock);
        return rslot;
 }
 
@@ -3761,6 +3811,7 @@ ExecModifyTable(PlanState *pstate)
        HeapTupleData oldtupdata;
        HeapTuple       oldtuple;
        ItemPointer tupleid;
+       bool            tuplock;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -4073,6 +4124,8 @@ ExecModifyTable(PlanState *pstate)
                                break;
 
                        case CMD_UPDATE:
+                               tuplock = false;
+
                                /* Initialize projection info if first time for 
this table */
                                if 
(unlikely(!resultRelInfo->ri_projectNewInfoValid))
                                        ExecInitUpdateProjection(node, 
resultRelInfo);
@@ -4084,6 +4137,7 @@ ExecModifyTable(PlanState *pstate)
                                oldSlot = resultRelInfo->ri_oldTupleSlot;
                                if (oldtuple != NULL)
                                {
+                                       
Assert(!resultRelInfo->ri_needLockTagTuple);
                                        /* Use the wholerow junk attr as the 
old tuple. */
                                        ExecForceStoreHeapTuple(oldtuple, 
oldSlot, false);
                                }
@@ -4092,6 +4146,11 @@ ExecModifyTable(PlanState *pstate)
                                        /* Fetch the most recent version of old 
tuple. */
                                        Relation        relation = 
resultRelInfo->ri_RelationDesc;
 
+                                       if (resultRelInfo->ri_needLockTagTuple)
+                                       {
+                                               LockTuple(relation, tupleid, 
InplaceUpdateTupleLock);
+                                               tuplock = true;
+                                       }
                                        if 
(!table_tuple_fetch_row_version(relation, tupleid,
                                                                                
                           SnapshotAny,
                                                                                
                           oldSlot))
@@ -4103,6 +4162,9 @@ ExecModifyTable(PlanState *pstate)
                                /* Now apply the update. */
                                slot = ExecUpdate(&context, resultRelInfo, 
tupleid, oldtuple,
                                                                  slot, 
node->canSetTag);
+                               if (tuplock)
+                                       
UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                                               
InplaceUpdateTupleLock);
                                break;
 
                        case CMD_DELETE:
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index 930cc03..3f1e8ce 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3770,6 +3770,7 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
 {
        RelFileNumber newrelfilenumber;
        Relation        pg_class;
+       ItemPointerData otid;
        HeapTuple       tuple;
        Form_pg_class classform;
        MultiXactId minmulti = InvalidMultiXactId;
@@ -3812,11 +3813,12 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
         */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID,
-                                                               
ObjectIdGetDatum(RelationGetRelid(relation)));
+       tuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                         
ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u",
                         RelationGetRelid(relation));
+       otid = tuple->t_self;
        classform = (Form_pg_class) GETSTRUCT(tuple);
 
        /*
@@ -3936,9 +3938,10 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
                classform->relminmxid = minmulti;
                classform->relpersistence = persistence;
 
-               CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_class, &otid, tuple);
        }
 
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tuple);
 
        table_close(pg_class, RowExclusiveLock);
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 3e03dfc..50c9440 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -30,7 +30,10 @@
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -269,6 +272,98 @@ ReleaseSysCache(HeapTuple tuple)
 }
 
 /*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                                         Datum key1)
+{
+       ItemPointerData tid;
+       LOCKTAG         tag;
+       Oid                     dboid =
+               SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+       Oid                     reloid = cacheinfo[cacheId].reloid;
+
+       /*----------
+        * Since inplace updates may happen just before our LockTuple(), we must
+        * return content acquired after LockTuple() of the TID we return.  If 
we
+        * just fetched twice instead of looping, the following sequence would
+        * defeat our locking:
+        *
+        * GRANT:   SearchSysCache1() = TID (1,5)
+        * GRANT:   LockTuple(pg_class, (1,5))
+        * [no more inplace update of (1,5) until we release the lock]
+        * CLUSTER: SearchSysCache1() = TID (1,5)
+        * CLUSTER: heap_update() = TID (1,8)
+        * CLUSTER: COMMIT
+        * GRANT:   SearchSysCache1() = TID (1,8)
+        * GRANT:   return (1,8) from SearchSysCacheLocked1()
+        * VACUUM:  SearchSysCache1() = TID (1,8)
+        * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one 
rel
+        * VACUUM:  inplace update
+        * GRANT:   heap_update() = (1,9)  # lose inplace update
+        *
+        * In the happy case, this takes two fetches, one to determine the TID 
to
+        * lock and another to get the content and confirm the TID didn't 
change.
+        *
+        * This is valid even if the row gets updated to a new TID, the old TID
+        * becomes LP_UNUSED, and the row gets updated back to its old TID.  
We'd
+        * still hold the right LOCKTAG_TUPLE and a copy of the row captured 
after
+        * the LOCKTAG_TUPLE.
+        */
+       ItemPointerSetInvalid(&tid);
+       for (;;)
+       {
+               HeapTuple       tuple;
+               LOCKMODE        lockmode = InplaceUpdateTupleLock;
+
+               tuple = SearchSysCache1(cacheId, key1);
+               if (ItemPointerIsValid(&tid))
+               {
+                       if (!HeapTupleIsValid(tuple))
+                       {
+                               LockRelease(&tag, lockmode, false);
+                               return tuple;
+                       }
+                       if (ItemPointerEquals(&tid, &tuple->t_self))
+                               return tuple;
+                       LockRelease(&tag, lockmode, false);
+               }
+               else if (!HeapTupleIsValid(tuple))
+                       return tuple;
+
+               tid = tuple->t_self;
+               ReleaseSysCache(tuple);
+               /* like: LockTuple(rel, &tid, lockmode) */
+               SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                                                 
ItemPointerGetBlockNumber(&tid),
+                                                 
ItemPointerGetOffsetNumber(&tid));
+               (void) LockAcquire(&tag, lockmode, false, false);
+
+               /*
+                * If an inplace update just finished, ensure we process the 
syscache
+                * inval.  XXX this is insufficient: the inplace updater may 
not yet
+                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                *
+                * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
+                * probably find the old tuple and reach "tuple concurrently 
updated".
+                * If that heap_update() aborts, our LOCKTAG_TUPLE blocks 
inplace
+                * updates while our caller works.
+                */
+               AcceptInvalidationMessages();
+       }
+}
+
+/*
  * SearchSysCacheCopy
  *
  * A convenience routine that does SearchSysCache and (if successful)
@@ -295,6 +390,28 @@ SearchSysCacheCopy(int cacheId,
 }
 
 /*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                                                 Datum key1)
+{
+       HeapTuple       tuple,
+                               newtuple;
+
+       tuple = SearchSysCacheLocked1(cacheId, key1);
+       if (!HeapTupleIsValid(tuple))
+               return tuple;
+       newtuple = heap_copytuple(tuple);
+       ReleaseSysCache(tuple);
+       return newtuple;
+}
+
+/*
  * SearchSysCacheExists
  *
  * A convenience routine that just probes to see if a tuple can be found.
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b62c96f..eab0add 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -482,6 +482,9 @@ typedef struct ResultRelInfo
        /* Have the projection and the slots above been initialized? */
        bool            ri_projectNewInfoValid;
 
+       /* updates do LockTuple() before oldtup read; see README.tuplock */
+       bool            ri_needLockTagTuple;
+
        /* triggers to be fired, if any */
        TriggerDesc *ri_TrigDesc;
 
diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h
index 934ba84..810b297 100644
--- a/src/include/storage/lockdefs.h
+++ b/src/include/storage/lockdefs.h
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                            8       /* highest standard 
lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 03a27dd..b541911 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                                                          
Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                                                        Datum 
key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                                                               
   Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                                                 Datum key1, 
Datum key2, Datum key3, Datum key4);
 extern Oid     GetSysCacheOid(int cacheId, AttrNumber oidcol,
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index c2a9841..b5fe8b0 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -154,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  <waiting ...>
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
        SELECT relhasindex FROM pg_class
@@ -194,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -223,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
diff --git a/src/test/isolation/specs/eval-plan-qual.spec 
b/src/test/isolation/specs/eval-plan-qual.spec
index 3a74406..07307e6 100644
--- a/src/test/isolation/specs/eval-plan-qual.spec
+++ b/src/test/isolation/specs/eval-plan-qual.spec
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
        update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1      {
        UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index eed0b52..2992c85 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1        { BEGIN; }
 step grant1    {
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1       { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
@@ -73,8 +75,6 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
        b1
        grant1
@@ -126,8 +126,8 @@ permutation
        b2
        sfnku2
        b1
-       grant1(c2)              # acquire LockTuple(), await sfnku2 xmax
-       addk2                   # block in LockTuple() behind grant1 = deadlock
+       grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+       addk2(*)                # block in LockTuple() behind grant1 = deadlock
        c2
        c1
        read2
@@ -138,7 +138,7 @@ permutation
        grant1
        b3
        sfu3(c1)        # acquire LockTuple(), await grant1 xmax
-       revoke4(sfu3)   # block in LockTuple() behind sfu3
+       revoke4(r3)     # block in LockTuple() behind sfu3
        c1
        r3                      # revoke4 unlocks old tuple and finds new
 

Reply via email to