>From 55367abdf4ce81e022211cb94e8d036e916069c9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 02:27:38 +0200
Subject: [PATCH 1/3] Fix signal handling in logical workers

---
 src/backend/replication/logical/launcher.c  | 22 +++++++-----
 src/backend/replication/logical/tablesync.c | 23 +++++++-----
 src/backend/replication/logical/worker.c    | 54 ++++++++++++++++++++---------
 src/backend/tcop/postgres.c                 |  5 +++
 src/include/replication/logicalworker.h     |  3 ++
 src/include/replication/worker_internal.h   | 10 ------
 6 files changed, 74 insertions(+), 43 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4e2c350..2e80b4c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 
 /* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -614,12 +614,18 @@ logicalrep_launcher_onexit(int code, Datum arg)
 static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
+	/* Disconnect gracefully from the remote side. */
+	if (wrconn)
+		walrcv_disconnect(wrconn);
+
 	logicalrep_worker_detach();
+
+	ApplyLauncherWakeup();
 }
 
 /* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
@@ -632,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
 }
 
 /* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
 {
 	int			save_errno = errno;
 
@@ -793,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
 	before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
 
 	/* Establish signal handlers. */
-	pqsignal(SIGHUP, logicalrep_worker_sighup);
-	pqsignal(SIGTERM, logicalrep_worker_sigterm);
+	pqsignal(SIGHUP, logicalrep_launcher_sighup);
+	pqsignal(SIGTERM, logicalrep_launcher_sigterm);
 	BackgroundWorkerUnblockSignals();
 
 	/* Make it easy to identify our processes. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1e3753b..b92cc85 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -97,6 +97,7 @@
 
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
@@ -137,7 +138,6 @@ finish_sync_worker(void)
 			(errmsg("logical replication synchronization worker finished processing")));
 
 	/* Stop gracefully */
-	walrcv_disconnect(wrconn);
 	proc_exit(0);
 }
 
@@ -152,10 +152,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
 	int			rc;
 	char		state = origstate;
 
-	while (!got_SIGTERM)
+	for (;;)
 	{
 		LogicalRepWorker *worker;
 
+		CHECK_FOR_INTERRUPTS();
+
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
 										relid, false);
@@ -476,7 +478,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
@@ -533,7 +535,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		bytesread += avail;
 	}
 
-	while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+	while (maxread > 0 && bytesread < minread)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
 		int			rc;
@@ -587,10 +589,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		ResetLatch(&MyProc->procLatch);
 	}
 
-	/* Check for exit condition. */
-	if (got_SIGTERM)
-		proc_exit(0);
-
 	return bytesread;
 }
 
@@ -910,3 +908,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	return slotname;
 }
+
+/*
+ * Is current process a tablesync worker?
+ */
+bool
+IsTablesyncWorker(void)
+{
+	return MyLogicalRepWorker != NULL && OidIsValid(MyLogicalRepWorker->relid);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7d1787d..971f76b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -72,6 +72,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 
+#include "tcop/tcopprot.h"
+
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void reread_subscription(void);
 
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -134,7 +139,7 @@ static void reread_subscription(void);
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		return MyLogicalRepWorker->relid == rel->localreloid;
 	else
 		return (rel->state == SUBREL_STATE_READY ||
@@ -444,7 +449,7 @@ apply_handle_commit(StringInfo s)
 	Assert(commit_data.commit_lsn == remote_final_lsn);
 
 	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState() && !IsTablesyncWorker())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
@@ -480,7 +485,7 @@ apply_handle_origin(StringInfo s)
 	 * actual writes.
 	 */
 	if (!in_remote_transaction ||
-		(IsTransactionState() && !am_tablesync_worker()))
+		(IsTransactionState() && !IsTablesyncWorker()))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg("ORIGIN message sent out of order")));
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
-	while (!got_SIGTERM)
+	for (;;)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
 		int			rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 		bool		ping_sent = false;
 
+		CHECK_FOR_INTERRUPTS();
+
 		MemoryContextSwitchTo(ApplyMessageContext);
 
 		len = walrcv_receive(wrconn, &buf, &fd);
@@ -1129,7 +1136,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			TimeLineID	tli;
 
 			walrcv_endstreaming(wrconn, &tli);
-			break;
+			proc_exit(0);
 		}
 
 		/*
@@ -1329,7 +1336,6 @@ reread_subscription(void)
 				   "stop because the subscription was removed",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1344,7 +1350,6 @@ reread_subscription(void)
 				   "stop because the subscription was disabled",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1359,7 +1364,6 @@ reread_subscription(void)
 				   "restart because the connection information was changed",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1374,7 +1378,6 @@ reread_subscription(void)
 				   "restart because subscription was renamed",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1392,7 +1395,6 @@ reread_subscription(void)
 				   "restart because the replication slot name was changed",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1407,7 +1409,6 @@ reread_subscription(void)
 				   "restart because subscription's publications were changed",
 				   MySubscription->name)));
 
-		walrcv_disconnect(wrconn);
 		proc_exit(0);
 	}
 
@@ -1443,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
 	MySubscriptionValid = false;
 }
 
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_SIGHUP = true;
+
+	/* Waken anything waiting on the process latch */
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
 
 /* Logical Replication Apply worker entry point */
 void
@@ -1460,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, logicalrep_worker_sighup);
-	pqsignal(SIGTERM, logicalrep_worker_sigterm);
+	pqsignal(SIGTERM, die);
 	BackgroundWorkerUnblockSignals();
 
 	/* Initialise stats to a sanish value */
@@ -1515,7 +1529,7 @@ ApplyWorkerMain(Datum main_arg)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		elog(LOG, "logical replication sync for subscription %s, table %s started",
 			 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid));
 	else
@@ -1528,7 +1542,7 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 	{
 		char	   *syncslotname;
 
@@ -1608,8 +1622,14 @@ ApplyWorkerMain(Datum main_arg)
 	/* Run the main loop. */
 	LogicalRepApplyLoop(origin_startpos);
 
-	walrcv_disconnect(wrconn);
+	/* Not reached. */
+}
 
-	/* We should only get here if we received SIGTERM */
-	proc_exit(0);
+/*
+ * Is current process a apply  worker?
+ */
+bool
+IsApplyWorker(void)
+{
+	return MyLogicalRepWorker != NULL;
 }
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a..1b1134c8 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -55,6 +55,7 @@
 #include "pg_getopt.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("terminating autovacuum process due to administrator command")));
+		else if (IsApplyWorker())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating logical replication worker due to administrator command")));
 		else if (RecoveryConflictPending && RecoveryConflictRetryable)
 		{
 			pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 3e0affa..6c71343 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,4 +14,7 @@
 
 extern void ApplyWorkerMain(Datum main_arg);
 
+extern bool IsApplyWorker(void);
+extern bool IsTablesyncWorker(void);
+
 #endif   /* LOGICALWORKER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0654461..c310ca5 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
 extern LogicalRepWorker *MyLogicalRepWorker;
 
 extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,17 +79,9 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
 void invalidate_syncing_table_states(Datum arg, int cacheid,
 								uint32 hashvalue);
 
-static inline bool
-am_tablesync_worker(void)
-{
-	return OidIsValid(MyLogicalRepWorker->relid);
-}
-
 #endif   /* WORKER_INTERNAL_H */
-- 
2.7.4

