I wrote:
> As far as 4) goes, I think the code in ExtractReplicaIdentity is pretty
> duff anyway, because it doesn't bother to check for the defined failure
> return for RelationIdGetRelation.  But if we're concerned about the
> cost of recalculating this stuff per-row, couldn't we cache it a little
> better?  It should be safe to assume the set of index columns isn't
> changing intra-query.
> ... in fact, isn't all the infrastructure for that present already?
> Why is this code looking directly at the index at all, rather than
> using the relcache's rd_idattr bitmap?

Here's a proposed patch along those lines.  It fixes Hadi's original
crash case and passes check-world.

I'm a bit suspicious of the exclusion for idattrs being empty, but
if I remove that, some of the contrib/test_decoding test results
change.  Anybody want to comment on that?  If that's actually an
expected situation, why is there an elog(DEBUG) in that path?

                        regards, tom lane

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index cb811d3..8a3c7dc 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7594,18 +7594,20 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
  *
  * Returns NULL if there's no need to log an identity or if there's no suitable
  * key in the Relation relation.
+ *
+ * *copy is set to true if the returned tuple is a modified copy rather than
+ * the same tuple that was passed in.
  */
 static HeapTuple
-ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy)
+ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed,
+					   bool *copy)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
-	Oid			replidindex;
-	Relation	idx_rel;
 	char		replident = relation->rd_rel->relreplident;
-	HeapTuple	key_tuple = NULL;
+	Bitmapset  *idattrs;
+	HeapTuple	key_tuple;
 	bool		nulls[MaxHeapAttributeNumber];
 	Datum		values[MaxHeapAttributeNumber];
-	int			natt;
 
 	*copy = false;
 
@@ -7624,7 +7626,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
 		if (HeapTupleHasExternal(tp))
 		{
 			*copy = true;
-			tp = toast_flatten_tuple(tp, RelationGetDescr(relation));
+			tp = toast_flatten_tuple(tp, desc);
 		}
 		return tp;
 	}
@@ -7633,41 +7635,37 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
 	if (!key_changed)
 		return NULL;
 
-	/* find the replica identity index */
-	replidindex = RelationGetReplicaIndex(relation);
-	if (!OidIsValid(replidindex))
+	/* find out the replica identity columns */
+	idattrs = RelationGetIndexAttrBitmap(relation,
+										 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	if (bms_is_empty(idattrs))
 	{
 		elog(DEBUG4, "could not find configured replica identity for table \"%s\"",
 			 RelationGetRelationName(relation));
 		return NULL;
 	}
 
-	idx_rel = RelationIdGetRelation(replidindex);
-
-	Assert(CheckRelationLockedByMe(idx_rel, AccessShareLock, true));
-
-	/* deform tuple, so we have fast access to columns */
-	heap_deform_tuple(tp, desc, values, nulls);
-
-	/* set all columns to NULL, regardless of whether they actually are */
-	memset(nulls, 1, sizeof(nulls));
-
 	/*
-	 * Now set all columns contained in the index to NOT NULL, they cannot
-	 * currently be NULL.
+	 * Construct a new tuple containing only the replica identity columns,
+	 * with nulls elsewhere.  While we're at it, assert that the replica
+	 * identity columns aren't null.
 	 */
-	for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
-	{
-		int			attno = idx_rel->rd_index->indkey.values[natt];
+	heap_deform_tuple(tp, desc, values, nulls);
 
-		if (attno < 0)
-			elog(ERROR, "system column in index");
-		nulls[attno - 1] = false;
+	for (int i = 0; i < desc->natts; i++)
+	{
+		if (bms_is_member(i + 1 - FirstLowInvalidHeapAttributeNumber,
+						  idattrs))
+			Assert(!nulls[i]);
+		else
+			nulls[i] = true;
 	}
 
 	key_tuple = heap_form_tuple(desc, values, nulls);
 	*copy = true;
-	RelationClose(idx_rel);
+
+	bms_free(idattrs);
 
 	/*
 	 * If the tuple, which by here only contains indexed columns, still has
@@ -7680,7 +7678,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
 	{
 		HeapTuple	oldtup = key_tuple;
 
-		key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation));
+		key_tuple = toast_flatten_tuple(oldtup, desc);
 		heap_freetuple(oldtup);
 	}
 

Reply via email to