From d0089cf006f4d3afbccf1923761a85afee713b91 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 12 May 2017 16:53:08 +0900
Subject: [PATCH 2/2] Wait for table sync worker to finish when apply worker
 exits.

---
 src/backend/replication/logical/launcher.c | 52 +++++++++++++++++++++++++++++-
 src/include/replication/worker_internal.h  |  1 +
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index dfce49d..9888758 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -403,8 +403,54 @@ retry:
 }
 
 /*
+ * Stop all table sync workers associated with given subid.
+ *
+ * This function is called by apply worker. Since table sync
+ * worker associated with same subscription is launched by
+ * only the apply worker. We don't need to acquire
+ * LogicalRepLauncherLock here.
+ */
+void
+logicalrep_sync_workers_stop(Oid subid)
+{
+	List *relid_list = NIL;
+	ListCell *cell;
+	int	i;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+	/*
+	 * Walks the workers array and get relid list that matches
+	 * given subscription id.
+	 */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid &&
+			OidIsValid(w->relid))
+			relid_list = lappend_oid(relid_list, w->relid);
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+
+	/* Return if there is no table sync worker associated with myself */
+	if (relid_list == NIL)
+		return;
+
+	foreach (cell, relid_list)
+	{
+		Oid	relid = lfirst_oid(cell);
+
+		logicalrep_worker_stop(subid, relid);
+	}
+}
+
+/*
  * Stop the logical replication worker and wait until it detaches from the
- * slot.
+ * slot. This function can be called by both logical replication launcher
+ * and apply worker to stop apply worker and table sync worker.
+ *
  *
  * The caller must hold LogicalRepLauncherLock to ensure that new workers are
  * not being started during this function call.
@@ -573,6 +619,10 @@ logicalrep_worker_attach(int slot)
 static void
 logicalrep_worker_detach(void)
 {
+	/* Stop all sync workers associated if apply worker */
+	if (!am_tablesync_worker())
+		logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+
 	/* Block concurrent access. */
 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 26788fe..2fec0b0 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -78,6 +78,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_sync_workers_stop(Oid subid);
 
 extern int logicalrep_sync_worker_count(Oid subid);
 
-- 
2.8.1

