From 4a488d4bb9a7d0b97e14edde7854fdf0246de249 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 15 Aug 2025 12:47:52 +0800
Subject: [PATCH vApproach2] delete the unused hash entry on invalidation

---
 src/backend/replication/pgoutput/pgoutput.c | 209 +++++++++++++-------
 1 file changed, 139 insertions(+), 70 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..feb07529bf1 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -131,6 +131,8 @@ typedef struct RelationSyncEntry
 
 	bool		schema_sent;
 
+	int			in_use;
+
 	/*
 	 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
 	 * columns and the 'publish_generated_columns' parameter is set to
@@ -223,6 +225,9 @@ static void init_rel_sync_cache(MemoryContext cachectx);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 											 Relation relation);
+static void close_rel_sync_entry(RelationSyncEntry *entry);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void delete_rel_sync_entry(RelationSyncEntry *entry);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
 									RelationSyncEntry *relentry);
@@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				close_rel_sync_entry(relentry);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				close_rel_sync_entry(relentry);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				close_rel_sync_entry(relentry);
 				return;
+			}
 
 			/*
 			 * This is only possible if deletes are allowed even when replica
@@ -1621,6 +1635,8 @@ cleanup:
 			ExecDropSingleTupleTableSlot(new_slot);
 	}
 
+	close_rel_sync_entry(relentry);
+
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 }
@@ -1658,7 +1674,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		relentry = get_rel_sync_entry(data, relation);
 
 		if (!relentry->pubactions.pubtruncate)
+		{
+			close_rel_sync_entry(relentry);
 			continue;
+		}
 
 		/*
 		 * Don't send partitions if the publication wants to send only the
@@ -1666,7 +1685,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		 */
 		if (relation->rd_rel->relispartition &&
 			relentry->publish_as_relid != relid)
+		{
+			close_rel_sync_entry(relentry);
 			continue;
+		}
 
 		relids[nrelids++] = relid;
 
@@ -1675,6 +1697,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			pgoutput_send_begin(ctx, txn);
 
 		maybe_send_schema(ctx, change, relation, relentry);
+
+		close_rel_sync_entry(relentry);
 	}
 
 	if (nrelids > 0)
@@ -2048,6 +2072,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	{
 		entry->replicate_valid = false;
 		entry->schema_sent = false;
+		entry->in_use = false;
 		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
@@ -2061,6 +2086,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
+	Assert(!entry->in_use);
+
+	entry->in_use = true;
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
@@ -2091,71 +2120,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
-		/*
-		 * Reset schema_sent status as the relation definition may have
-		 * changed.  Also reset pubactions to empty in case rel was dropped
-		 * from a publication.  Also free any objects that depended on the
-		 * earlier definition.
-		 */
-		entry->schema_sent = false;
-		entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
-		list_free(entry->streamed_txns);
-		entry->streamed_txns = NIL;
-		bms_free(entry->columns);
-		entry->columns = NULL;
-		entry->pubactions.pubinsert = false;
-		entry->pubactions.pubupdate = false;
-		entry->pubactions.pubdelete = false;
-		entry->pubactions.pubtruncate = false;
-
-		/*
-		 * Tuple slots cleanups. (Will be rebuilt later if needed).
-		 */
-		if (entry->old_slot)
-		{
-			TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->old_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-		if (entry->new_slot)
-		{
-			TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
-
-			Assert(desc->tdrefcount == -1);
-
-			ExecDropSingleTupleTableSlot(entry->new_slot);
-
-			/*
-			 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
-			 * do it now to avoid any leaks.
-			 */
-			FreeTupleDesc(desc);
-		}
-
-		entry->old_slot = NULL;
-		entry->new_slot = NULL;
-
-		if (entry->attrmap)
-			free_attrmap(entry->attrmap);
-		entry->attrmap = NULL;
-
-		/*
-		 * Row filter cache cleanups.
-		 */
-		if (entry->entry_cxt)
-			MemoryContextDelete(entry->entry_cxt);
-
-		entry->entry_cxt = NULL;
-		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		/* Cleanup existing data */
+		cleanup_rel_sync_entry(entry);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2277,101 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	return entry;
 }
 
+/*
+ * Mark the given entry as unused.
+ */
+static void
+close_rel_sync_entry(RelationSyncEntry *relentry)
+{
+	Assert(relentry->in_use);
+	relentry->in_use = false;
+}
+
+/*
+ * Cleanup attributes in the given entry.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+	/*
+	 * Reset schema_sent status as the relation definition may have changed.
+	 * Also reset pubactions to empty in case rel was dropped from a
+	 * publication.  Also free any objects that depended on the earlier
+	 * definition.
+	 */
+	entry->schema_sent = false;
+	entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+	list_free(entry->streamed_txns);
+	entry->streamed_txns = NIL;
+	bms_free(entry->columns);
+	entry->columns = NULL;
+	entry->pubactions.pubinsert = false;
+	entry->pubactions.pubupdate = false;
+	entry->pubactions.pubdelete = false;
+	entry->pubactions.pubtruncate = false;
+
+	/*
+	 * Tuple slots cleanups. (Will be rebuilt later if needed).
+	 */
+	if (entry->old_slot)
+	{
+		TupleDesc	desc = entry->old_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->old_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+	if (entry->new_slot)
+	{
+		TupleDesc	desc = entry->new_slot->tts_tupleDescriptor;
+
+		Assert(desc->tdrefcount == -1);
+
+		ExecDropSingleTupleTableSlot(entry->new_slot);
+
+		/*
+		 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+		 * it now to avoid any leaks.
+		 */
+		FreeTupleDesc(desc);
+	}
+
+	entry->old_slot = NULL;
+	entry->new_slot = NULL;
+
+	if (entry->attrmap)
+		free_attrmap(entry->attrmap);
+	entry->attrmap = NULL;
+
+	/*
+	 * Row filter cache cleanups.
+	 */
+	if (entry->entry_cxt)
+		MemoryContextDelete(entry->entry_cxt);
+
+	entry->entry_cxt = NULL;
+	entry->estate = NULL;
+	memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+static void
+delete_rel_sync_entry(RelationSyncEntry *entry)
+{
+	Assert(!entry->in_use);
+
+	cleanup_rel_sync_entry(entry);
+
+	/* Remove the etnry from the cache */
+	if (hash_search(RelationSyncCache, &entry->relid, HASH_REMOVE, NULL) == NULL)
+		elog(ERROR, "hash table corrupted");
+}
+
 /*
  * Cleanup list of streamed transactions and update the schema_sent flag.
  *
@@ -2374,9 +2435,9 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	 * Nobody keeps pointers to entries in this hash table around outside
 	 * logical decoding callback calls - but invalidation events can come in
 	 * *during* a callback if we do any syscache access in the callback.
-	 * Because of that we must mark the cache entry as invalid but not damage
-	 * any of its substructure here.  The next get_rel_sync_entry() call will
-	 * rebuild it all.
+	 * Because of that, if the hash entry is being used, we must mark the cache
+	 * entry as invalid but not damage any of its substructure here.  The next
+	 * get_rel_sync_entry() call will rebuild it all.
 	 */
 	if (OidIsValid(relid))
 	{
@@ -2387,7 +2448,12 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
 												  HASH_FIND, NULL);
 		if (entry != NULL)
-			entry->replicate_valid = false;
+		{
+			if (entry->in_use)
+				entry->replicate_valid = false;
+			else
+				delete_rel_sync_entry(entry);
+		}
 	}
 	else
 	{
@@ -2397,7 +2463,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		hash_seq_init(&status, RelationSyncCache);
 		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 		{
-			entry->replicate_valid = false;
+			if (entry->in_use)
+				entry->replicate_valid = false;
+			else
+				delete_rel_sync_entry(entry);
 		}
 	}
 }
-- 
2.50.1.windows.1

