From 504ab6ec63c2c9667decfc4a000e80ff83cf0183 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Mon, 16 Jul 2018 11:31:23 +0530
Subject: [PATCH 1/2] Handle subscriptions workers with subtransactions.

AtEOXact_ApplyLauncher() did not handle the case where a subscription
REFRESH operation in a subtransaction could be aborted. The function
basically did not consider subtransaction.

Fix this by having a stack of stop worker lists, each one belonging
to corresponding subtransaction depth.
---
 src/backend/access/transam/xact.c          |   2 +
 src/backend/replication/logical/launcher.c | 115 +++++++++++++++++++++++++++--
 src/include/replication/logicallauncher.h  |   1 +
 3 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1da1f13..9aa63c8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
+	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
+		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6ef333b..32ab193 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
 	Oid			relid;
 } LogicalRepWorkerId;
 
-static List *on_commit_stop_workers = NIL;
+typedef struct StopWorkersData
+{
+	int			nestDepth;				/* Sub-transaction nest level */
+	List	   *workers;				/* List of LogicalRepWorkerId */
+	struct StopWorkersData *parent;		/* This need not be an immediate
+										 * subtransaction parent */
+} StopWorkersData;
+
+/*
+ * Stack of StopWorkersData elements. Each stack element contains the workers
+ * to be stopped for that subtransaction.
+ */
+static StopWorkersData *on_commit_stop_workers = NULL;
 
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +571,40 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 void
 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
 {
+	int			nestDepth = GetCurrentTransactionNestLevel();
 	LogicalRepWorkerId *wid;
 	MemoryContext oldctx;
 
 	/* Make sure we store the info in context that survives until commit. */
 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
 
+	/* Check that previous transactions were properly cleaned up. */
+	Assert(on_commit_stop_workers == NULL ||
+		   nestDepth >= on_commit_stop_workers->nestDepth);
+
+	/*
+	 * Push a new stack element if we don't already have one for the current
+	 * nestDepth.
+	 */
+	if (on_commit_stop_workers == NULL ||
+		nestDepth > on_commit_stop_workers->nestDepth)
+	{
+		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
+		newdata->nestDepth = nestDepth;
+		newdata->workers = NIL;
+		newdata->parent = on_commit_stop_workers;
+		on_commit_stop_workers = newdata;
+	}
+
+	/*
+	 * Finally add a new worker into the worker list of the current
+	 * subtransaction.
+	 */
 	wid = palloc(sizeof(LogicalRepWorkerId));
 	wid->subid = subid;
 	wid->relid = relid;
-
-	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+	on_commit_stop_workers->workers =
+		lappend(on_commit_stop_workers->workers, wid);
 
 	MemoryContextSwitchTo(oldctx);
 }
@@ -823,7 +858,7 @@ ApplyLauncherShmemInit(void)
 bool
 XactManipulatesLogicalReplicationWorkers(void)
 {
-	return (on_commit_stop_workers != NIL);
+	return (on_commit_stop_workers != NULL);
 }
 
 /*
@@ -832,15 +867,25 @@ XactManipulatesLogicalReplicationWorkers(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+
+	Assert(on_commit_stop_workers == NULL ||
+		   (on_commit_stop_workers->nestDepth == 1 &&
+			on_commit_stop_workers->parent == NULL));
+
 	if (isCommit)
 	{
 		ListCell   *lc;
 
-		foreach(lc, on_commit_stop_workers)
+		if (on_commit_stop_workers != NULL)
 		{
-			LogicalRepWorkerId *wid = lfirst(lc);
+			List	*workers = on_commit_stop_workers->workers;
 
-			logicalrep_worker_stop(wid->subid, wid->relid);
+			foreach(lc, workers)
+			{
+				LogicalRepWorkerId *wid = lfirst(lc);
+
+				logicalrep_worker_stop(wid->subid, wid->relid);
+			}
 		}
 
 		if (on_commit_launcher_wakeup)
@@ -851,11 +896,65 @@ AtEOXact_ApplyLauncher(bool isCommit)
 	 * No need to pfree on_commit_stop_workers.  It was allocated in
 	 * transaction memory context, which is going to be cleaned soon.
 	 */
-	on_commit_stop_workers = NIL;
+	on_commit_stop_workers = NULL;
 	on_commit_launcher_wakeup = false;
 }
 
 /*
+ * On commit, merge the current on_commit_stop_workers list into the
+ * immediate parent, if present.
+ * On rollback, discard the current on_commit_stop_workers list.
+ * Pop out the stack.
+ */
+void
+AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
+{
+	StopWorkersData *parent;
+
+	/* Exit immediately if there's no work to do at this level. */
+	if (on_commit_stop_workers == NULL ||
+		on_commit_stop_workers->nestDepth < nestDepth)
+		return;
+
+	Assert(on_commit_stop_workers->nestDepth == nestDepth);
+
+	parent = on_commit_stop_workers->parent;
+
+	if (isCommit)
+	{
+		/*
+		 * If the upper stack element is not an immediate parent
+		 * subtransaction, just decrement the notional nesting depth without
+		 * doing any real work.  Else, we need to merge the current workers
+		 * list into the parent.
+		 */
+		if (!parent || parent->nestDepth < nestDepth - 1)
+		{
+			on_commit_stop_workers->nestDepth--;
+			return;
+		}
+
+		parent->workers =
+			list_concat(parent->workers, on_commit_stop_workers->workers);
+	}
+	else
+	{
+		/*
+		 * Abandon everything that was done at this nesting level.  Explicitly
+		 * free memory to avoid a transaction-lifespan leak.
+		 */
+		list_free_deep(on_commit_stop_workers->workers);
+	}
+
+	/*
+	 * We have taken care of the current subtransaction workers list for both
+	 * abort or commit. So we are ready to pop the stack.
+	 */
+	pfree(on_commit_stop_workers);
+	on_commit_stop_workers = parent;
+}
+
+/*
  * Request wakeup of the launcher on commit of the transaction.
  *
  * This is used to send launcher signal to stop sleeping and process the
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ef02512..9f840b7 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 extern void ApplyLauncherWakeupAtCommit(void);
 extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
-- 
2.1.4

