From ea17efb14d856995f4e7c3e87b78631c7cb13ced Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Mon, 16 Jul 2018 12:00:31 +0530
Subject: [PATCH 2/2] Fix issue with subscriptions when altered twice in same
 transaction.

We keep track of workers to be stopped by adding them into an existing
list. But if a subscription is altered/refereshed twice, then this does
not work. For instance :

Suppose publication pubx is set for tables tx1 and and tx2.
Publication puby is for tables ty1 and ty2.
And publication pubz is for tables tx1, tx2, tx3.

Subscription mysub is initially set to synchronise tables tx1 and tx2 :
CREATE SUBSCRIPTION mysub ... PUBLICATION pubx;
And then it is altered like this :
begin;
ALTER SUBSCRIPTION mysub set publication puby;
ALTER SUBSCRIPTION mysub set publication pubz;
commit;

Here, at the 2nd ALTER command, while deriving the relations that no
more belong to mysub, we consider the pg_subscription_rel that was
updated by the 1st ALTER command. This just leads to addition of more
workers to the workers derived for the 1st ALTER command. Effectively,
at commit above, the stop-workers would contain tx1, tx2, ty1, ty2;
when it should contain nothing, because before the transaction, mysub
contained tx1 and tx2, and all of those are also present in mysyb at
transaction end since pubz contains those tables.

Fix this, by keeping track of what tables were present at the
transaction start.
---
 src/backend/access/transam/xact.c          |  3 +
 src/backend/catalog/pg_subscription.c      | 43 +++++++++----
 src/backend/commands/subscriptioncmds.c    | 96 +++++++++++++++++++++++++-----
 src/backend/replication/logical/launcher.c | 82 +++++++++++++++----------
 src/include/catalog/pg_subscription_rel.h  |  9 +++
 src/include/commands/subscriptioncmds.h    |  1 +
 src/include/replication/worker_internal.h  |  2 +-
 7 files changed, 179 insertions(+), 57 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8..aefdba2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -33,6 +33,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -2128,6 +2129,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true, false);
+	AtEOXact_Subscription();
 	AtEOXact_ApplyLauncher(true);
 	pgstat_report_xact_timestamp(0);
 
@@ -2607,6 +2609,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_Subscription();
 		AtEOXact_ApplyLauncher(false);
 		pgstat_report_xact_timestamp(0);
 	}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8705d8b..3d4695a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -33,6 +33,7 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -429,12 +430,12 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 
 
 /*
- * Get all relations for subscription.
+ * Get reloids of all relations for subscription.
  *
- * Returned list is palloc'ed in current memory context.
+ * Returned list is palloc'ed in the specified 'memcxt'
  */
 List *
-GetSubscriptionRelations(Oid subid)
+GetSubscriptionRelids(Oid subid, MemoryContext memcxt)
 {
 	List	   *res = NIL;
 	Relation	rel;
@@ -442,6 +443,7 @@ GetSubscriptionRelations(Oid subid)
 	int			nkeys = 0;
 	ScanKeyData skey[2];
 	SysScanDesc scan;
+	MemoryContext old_context;
 
 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -453,20 +455,15 @@ GetSubscriptionRelations(Oid subid)
 	scan = systable_beginscan(rel, InvalidOid, false,
 							  NULL, nkeys, skey);
 
+	old_context = MemoryContextSwitchTo(memcxt);
 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
 	{
 		Form_pg_subscription_rel subrel;
-		SubscriptionRelState *relstate;
 
 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
-
-		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
-
-		res = lappend(res, relstate);
+		res = lappend_oid(res, subrel->srrelid);
 	}
+	MemoryContextSwitchTo(old_context);
 
 	/* Cleanup */
 	systable_endscan(scan);
@@ -526,3 +523,27 @@ GetSubscriptionNotReadyRelations(Oid subid)
 
 	return res;
 }
+
+/*
+ * Create a hash table, hashed by subid. Each entry will contain a subset of
+ * relations belonging to the given subscription.
+ */
+HTAB *
+CreateSubscriptionRelHash(void)
+{
+	HASHCTL		ctl;
+
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(SubscriptionRelEntry);
+
+	/*
+	 * All current users require the allocations to be valid until transaction
+	 * end.
+	 */
+	ctl.hcxt = TopTransactionContext;
+
+	return hash_create("SubscriptionRels",
+					   2,		/* typically, this will be small */
+					   &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61..e377ff0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,6 +51,18 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+
+/*
+ * Hash table of subscriptions.  Each entry has the relids for a given
+ * subscription that were updated on the last COMMIT.  For a subid, there
+ * exists an entry in this hash table only when the subscription relations are
+ * altered.  Once the transaction ends, the hash table is destroyed and reset
+ * to NIL.  This is done so that during commit, we know exactly which workers
+ * to stop: the relations for the last altered subscription should be compared
+ * with the relations for the last committed subscription changes.
+ */
+static HTAB *committed_subrels_table = NULL;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 
 /*
@@ -504,9 +516,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 {
 	char	   *err;
 	List	   *pubrel_names;
-	List	   *subrel_states;
+	List	   *subrelids;
+	SubscriptionRelEntry *committed_subrels_entry;
+	bool		sub_found;
 	Oid		   *subrel_local_oids;
 	Oid		   *pubrel_local_oids;
+	List	   *stop_relids = NIL;
 	ListCell   *lc;
 	int			off;
 
@@ -525,24 +540,34 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	/* We are done with the remote side, close connection. */
 	walrcv_disconnect(wrconn);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
+	/* Get the committed subrels for the given subscription */
+	if (committed_subrels_table == NULL)
+		committed_subrels_table = CreateSubscriptionRelHash();
+	committed_subrels_entry =
+		(SubscriptionRelEntry *) hash_search(committed_subrels_table,
+										 &sub->oid, HASH_ENTER, &sub_found);
+
+	/*
+	 * Get local table list. If we are creating the committed subrel list for
+	 * the first time for this subscription in this transaction, add them into
+	 * the committed_subrels_table, and also make sure the list is maintained
+	 * until transaction end.
+	 */
+	subrelids = GetSubscriptionRelids(sub->oid,
+					sub_found ?  CurrentMemoryContext : TopTransactionContext);
+	if (!sub_found)
+		committed_subrels_entry->relids = subrelids;
 
 	/*
 	 * Build qsorted array of local table oids for faster lookup. This can
 	 * potentially contain all tables in the database so speed of lookup is
 	 * important.
 	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+	subrel_local_oids = palloc(list_length(subrelids) * sizeof(Oid));
 	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
-
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+	foreach(lc, subrelids)
+		subrel_local_oids[off++] = lfirst_oid(lc);
+	qsort(subrel_local_oids, list_length(subrelids), sizeof(Oid), oid_cmp);
 
 	/*
 	 * Walk over the remote tables and try to match them to locally known
@@ -567,7 +592,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		pubrel_local_oids[off++] = relid;
 
 		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+					 list_length(subrelids), sizeof(Oid), oid_cmp))
 		{
 			AddSubscriptionRelState(sub->oid, relid,
 									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -585,7 +610,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	qsort(pubrel_local_oids, list_length(pubrel_names),
 		  sizeof(Oid), oid_cmp);
 
-	for (off = 0; off < list_length(subrel_states); off++)
+	for (off = 0; off < list_length(subrelids); off++)
 	{
 		Oid			relid = subrel_local_oids[off];
 
@@ -594,7 +619,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+			/*
+			 * If we found an entry in committed_subrels for this subid, that
+			 * means subrelids represents a modified version of the
+			 * committed_subrels_entry->relids. If we didn't find an entry, it
+			 * means this is the first time we are altering the sub, so they
+			 * both have the same committed list; so in that case, we avoid
+			 * another iteration below, and create the stop workers here itself.
+			 */
+			if (!sub_found)
+				stop_relids = lappend_oid(stop_relids, relid);
 
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
@@ -603,6 +637,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							sub->name)));
 		}
 	}
+
+	/*
+	 * Now derive the workers to be stopped using the committed reloids. At
+	 * commit time, we will terminate them.
+	 */
+	if (sub_found)
+	{
+		foreach(lc, committed_subrels_entry->relids)
+		{
+			Oid			relid = lfirst_oid(lc);
+
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+				stop_relids = lappend_oid(stop_relids, relid);
+		}
+	}
+
+	logicalrep_worker_stop_at_commit(sub->oid, stop_relids);
 }
 
 /*
@@ -1172,3 +1224,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Cleanup function for objects maintained during the transaction by
+ * subscription-refresh operation.
+ */
+void
+AtEOXact_Subscription(void)
+{
+	/*
+	 * The hash table must have already been freed because it was allocated in
+	 * TopTransactionContext.
+	 */
+	committed_subrels_table = NULL;
+}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 32ab193..e3059d6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,16 +73,10 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
-typedef struct LogicalRepWorkerId
-{
-	Oid			subid;
-	Oid			relid;
-} LogicalRepWorkerId;
-
 typedef struct StopWorkersData
 {
 	int			nestDepth;				/* Sub-transaction nest level */
-	List	   *workers;				/* List of LogicalRepWorkerId */
+	HTAB	   *workers;		/* Workers to be stopped. Hashed by subid */
 	struct StopWorkersData *parent;		/* This need not be an immediate
 										 * subtransaction parent */
 } StopWorkersData;
@@ -566,14 +560,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
- * Request worker for specified sub/rel to be stopped on commit.
+ * Request workers for the specified relids of a subscription to be stopped on
+ * commit. This replaces the earlier saved reloids of a given subscription.
  */
 void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+logicalrep_worker_stop_at_commit(Oid subid, List *relids)
 {
 	int			nestDepth = GetCurrentTransactionNestLevel();
-	LogicalRepWorkerId *wid;
 	MemoryContext oldctx;
+	SubscriptionRelEntry *subrel_entry;
+	bool		sub_found;
 
 	/* Make sure we store the info in context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
@@ -591,20 +587,22 @@ logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 	{
 		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
 		newdata->nestDepth = nestDepth;
-		newdata->workers = NIL;
+		newdata->workers = CreateSubscriptionRelHash();
 		newdata->parent = on_commit_stop_workers;
 		on_commit_stop_workers = newdata;
 	}
 
 	/*
-	 * Finally add a new worker into the worker list of the current
-	 * subtransaction.
+	 * If there's an existing entry, it means the same subscription was already
+	 * refreshed earlier in the current subtransaction. In that case, replace
+	 * the existing relations with the new ones.
 	 */
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
-	on_commit_stop_workers->workers =
-		lappend(on_commit_stop_workers->workers, wid);
+	subrel_entry =
+		(SubscriptionRelEntry *) hash_search(on_commit_stop_workers->workers,
+											 &subid, HASH_ENTER, &sub_found);
+	if (sub_found)
+		list_free(subrel_entry->relids);
+	subrel_entry->relids = list_copy(relids);
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -874,17 +872,17 @@ AtEOXact_ApplyLauncher(bool isCommit)
 
 	if (isCommit)
 	{
-		ListCell   *lc;
-
 		if (on_commit_stop_workers != NULL)
 		{
-			List	*workers = on_commit_stop_workers->workers;
+			SubscriptionRelEntry *entry;
+			HASH_SEQ_STATUS hash_seq;
+			ListCell   *lc;
 
-			foreach(lc, workers)
+			hash_seq_init(&hash_seq, on_commit_stop_workers->workers);
+			while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
 			{
-				LogicalRepWorkerId *wid = lfirst(lc);
-
-				logicalrep_worker_stop(wid->subid, wid->relid);
+				foreach(lc, entry->relids)
+					logicalrep_worker_stop(entry->subid, lfirst_oid(lc));
 			}
 		}
 
@@ -910,6 +908,10 @@ void
 AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 {
 	StopWorkersData *parent;
+	HASH_SEQ_STATUS hash_seq;
+	HTAB *workers;
+	SubscriptionRelEntry *entry;
+
 
 	/* Exit immediately if there's no work to do at this level. */
 	if (on_commit_stop_workers == NULL ||
@@ -919,14 +921,14 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 	Assert(on_commit_stop_workers->nestDepth == nestDepth);
 
 	parent = on_commit_stop_workers->parent;
+	workers = on_commit_stop_workers->workers;
 
 	if (isCommit)
 	{
 		/*
 		 * If the upper stack element is not an immediate parent
 		 * subtransaction, just decrement the notional nesting depth without
-		 * doing any real work.  Else, we need to merge the current workers
-		 * list into the parent.
+		 * doing any real work.
 		 */
 		if (!parent || parent->nestDepth < nestDepth - 1)
 		{
@@ -934,8 +936,24 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 			return;
 		}
 
-		parent->workers =
-			list_concat(parent->workers, on_commit_stop_workers->workers);
+		/* Else, we need to merge the current workers list into the parent. */
+		hash_seq_init(&hash_seq, workers);
+		while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
+		{
+			bool		sub_found;
+			SubscriptionRelEntry *parent_entry;
+
+			parent_entry =
+				(SubscriptionRelEntry *) hash_search(parent->workers,
+										 &entry->subid, HASH_ENTER, &sub_found);
+			/*
+			 * Replace the parent's workers with the current subtransaction's
+			 * workers.
+			 */
+			if (sub_found)
+				list_free(parent_entry->relids);
+			parent_entry->relids = entry->relids;
+		}
 	}
 	else
 	{
@@ -943,7 +961,11 @@ AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
 		 * Abandon everything that was done at this nesting level.  Explicitly
 		 * free memory to avoid a transaction-lifespan leak.
 		 */
-		list_free_deep(on_commit_stop_workers->workers);
+		hash_seq_init(&hash_seq, workers);
+		while ((entry = (SubscriptionRelEntry *) hash_seq_search(&hash_seq)) != NULL)
+			list_free(entry->relids);
+
+		hash_destroy(workers);
 	}
 
 	/*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 556cb94..acc6905 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -23,6 +23,7 @@
 
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
+#include "utils/hsearch.h"
 
 /* ----------------
  *		pg_subscription_rel definition. cpp turns this into
@@ -67,6 +68,12 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
+typedef struct SubscriptionRelEntry
+{
+	Oid			subid;		/* hash key - must be first */
+	List	   *relids;		/* Subset of relations in the subscription. */
+} SubscriptionRelEntry;
+
 extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
 						XLogRecPtr sublsn);
 extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
@@ -76,6 +83,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern List *GetSubscriptionRelations(Oid subid);
+extern List *GetSubscriptionRelids(Oid subid, MemoryContext memcxt);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
+extern HTAB *CreateSubscriptionRelHash(void);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 6d70ad7..e14b91e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -25,5 +25,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern void AtEOXact_Subscription(void);
 
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 1ce3b6b..1c48a33 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -75,7 +75,7 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, List *relids);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-- 
2.1.4

