>From 14c3d470bac298fc61518733b00ac5ba7d32de03 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Mon, 24 Apr 2017 07:17:42 +0200
Subject: [PATCH 3/3] Make walsender behave more like normal backend

The signal handling in walsender is now almost same as in standard
postgres handler.

Database connected walsender also allows all the protocol commands
nowmal backend does.

In passing fix the SQL query fallback in the replication command parser.
---
 src/backend/replication/repl_gram.y |  20 +------
 src/backend/replication/walsender.c |  75 +++++++++++++-------------
 src/backend/tcop/postgres.c         | 105 ++++++++++++++----------------------
 src/include/replication/walsender.h |   1 +
 4 files changed, 82 insertions(+), 119 deletions(-)

diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index ec047c8..6e6d696 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -25,8 +25,6 @@
 /* Result of the parsing is returned here */
 Node *replication_parse_result;
 
-static SQLCmd *make_sqlcmd(void);
-
 
 /*
  * Bison doesn't allocate anything that needs to live across parser calls,
@@ -385,24 +383,8 @@ plugin_opt_arg:
 		;
 
 sql_cmd:
-			IDENT							{ $$ = (Node *) make_sqlcmd(); }
+											{ replication_parse_result = (Node *) makeNode(SQLCmd); YYACCEPT; }
 		;
 %%
 
-static SQLCmd *
-make_sqlcmd(void)
-{
-	SQLCmd *cmd = makeNode(SQLCmd);
-	int tok;
-
-	/* Just move lexer to the end of command. */
-	for (;;)
-	{
-		tok = yylex();
-		if (tok == ';' || tok == 0)
-			break;
-	}
-	return cmd;
-}
-
 #include "repl_scanner.c"
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7f107b5..16026e9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -210,7 +210,6 @@ static struct
 } LagTracker;
 
 /* Signal handlers */
-static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -1373,7 +1372,45 @@ WalSndWaitForWal(XLogRecPtr loc)
 }
 
 /*
- * Execute an incoming replication command.
+ * Do special handling of commands when running inside walsender.
+ *
+ * 'firstchar' specifies what kind of a message was received, and is used to
+ * determine if the command is allowed, and if not, to construct the error
+ * message.
+ */
+void
+walsender_command_start(const char firstchar)
+{
+	if (am_walsender)
+	{
+		/*
+		 * CREATE_REPLICATION_SLOT ... LOGICAL might have exported a snapshot
+		 * until the next command arrives. Clean it up if that was the case.
+		 */
+		SnapBuildClearExportedSnapshot();
+
+		/* If the walsender is connected to the database, we are done. */
+		if (am_db_walsender)
+			return;
+
+		/*
+		 * This is walsender which is not connected in database, throw error
+		 * on anything that's not simple protocol query.
+		 */
+		if (firstchar == 'Q')
+			return;
+		else if (firstchar == 'F')
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("fastpath function calls not supported in a replication connection")));
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("extended query protocol not supported in a replication connection")));
+	}
+}
+
+/* * Execute an incoming replication command.
  *
  * Returns true if the cmd_string was recognized as WalSender command, false
  * if not.
@@ -1386,12 +1423,6 @@ exec_replication_command(const char *cmd_string)
 	MemoryContext cmd_context;
 	MemoryContext old_context;
 
-	/*
-	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
-	 * command arrives. Clean up the old stuff if there's anything.
-	 */
-	SnapBuildClearExportedSnapshot();
-
 	CHECK_FOR_INTERRUPTS();
 
 	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
@@ -1420,13 +1451,6 @@ exec_replication_command(const char *cmd_string)
 				(errmsg("received replication command: %s", cmd_string)));
 
 	/*
-	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
-	 * called outside of transaction the snapshot should be cleared here.
-	 */
-	if (!IsTransactionBlock())
-		SnapBuildClearExportedSnapshot();
-
-	/*
 	 * For aborted transactions, don't allow anything except pure SQL,
 	 * the exec_simple_query() will handle it correctly.
 	 */
@@ -2830,17 +2854,6 @@ WalSndRqstFileReload(void)
 	}
 }
 
-/* SIGUSR1: set flag to send WAL records */
-static void
-WalSndXLogSendHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	latch_sigusr1_handler();
-
-	errno = save_errno;
-}
-
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
@@ -2866,20 +2879,10 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
 void
 WalSndSignals(void)
 {
-	/* Set up signal handlers */
-	pqsignal(SIGHUP, PostgresSigHupHandler);		/* set flag to read config
-													 * file */
-	pqsignal(SIGINT, SIG_IGN);	/* not used */
-	pqsignal(SIGTERM, die);		/* request shutdown */
-	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
-	InitializeTimeouts();		/* establishes SIGALRM handler */
-	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
 	/* Reset some signals that are accepted by postmaster but not here */
-	pqsignal(SIGCHLD, SIG_DFL);
 	pqsignal(SIGTTIN, SIG_DFL);
 	pqsignal(SIGTTOU, SIG_DFL);
 	pqsignal(SIGCONT, SIG_DFL);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 46fc567..5cecbaf 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -174,7 +174,6 @@ static int	InteractiveBackend(StringInfo inBuf);
 static int	interactive_getc(void);
 static int	SocketBackend(StringInfo inBuf);
 static int	ReadCommand(StringInfo inBuf);
-static void forbidden_in_wal_sender(char firstchar);
 static List *pg_rewrite_query(Query *query);
 static bool check_log_statement(List *stmt_list);
 static int	errdetail_execute(List *raw_parsetree_list);
@@ -3623,44 +3622,43 @@ PostgresMain(int argc, char *argv[],
 	 * an issue for signals that are locally generated, such as SIGALRM and
 	 * SIGPIPE.)
 	 */
-	if (am_walsender)
-		WalSndSignals();
+	pqsignal(SIGHUP, PostgresSigHupHandler);		/* set flag to read config
+											 * file */
+	pqsignal(SIGINT, StatementCancelHandler);		/* cancel current query */
+	pqsignal(SIGTERM, die); /* cancel current query and exit */
+
+	/*
+	 * In a standalone backend, SIGQUIT can be generated from the keyboard
+	 * easily, while SIGTERM cannot, so we make both signals do die()
+	 * rather than quickdie().
+	 */
+	if (IsUnderPostmaster)
+		pqsignal(SIGQUIT, quickdie);		/* hard crash time */
 	else
-	{
-		pqsignal(SIGHUP, PostgresSigHupHandler);		/* set flag to read config
-														 * file */
-		pqsignal(SIGINT, StatementCancelHandler);		/* cancel current query */
-		pqsignal(SIGTERM, die); /* cancel current query and exit */
+		pqsignal(SIGQUIT, die);		/* cancel current query and exit */
+	InitializeTimeouts();	/* establishes SIGALRM handler */
 
-		/*
-		 * In a standalone backend, SIGQUIT can be generated from the keyboard
-		 * easily, while SIGTERM cannot, so we make both signals do die()
-		 * rather than quickdie().
-		 */
-		if (IsUnderPostmaster)
-			pqsignal(SIGQUIT, quickdie);		/* hard crash time */
-		else
-			pqsignal(SIGQUIT, die);		/* cancel current query and exit */
-		InitializeTimeouts();	/* establishes SIGALRM handler */
+	/*
+	 * Ignore failure to write to frontend. Note: if frontend closes
+	 * connection, we will notice it and exit cleanly when control next
+	 * returns to outer loop.  This seems safer than forcing exit in the
+	 * midst of output during who-knows-what operation...
+	 */
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+	pqsignal(SIGFPE, FloatExceptionHandler);
 
-		/*
-		 * Ignore failure to write to frontend. Note: if frontend closes
-		 * connection, we will notice it and exit cleanly when control next
-		 * returns to outer loop.  This seems safer than forcing exit in the
-		 * midst of output during who-knows-what operation...
-		 */
-		pqsignal(SIGPIPE, SIG_IGN);
-		pqsignal(SIGUSR1, procsignal_sigusr1_handler);
-		pqsignal(SIGUSR2, SIG_IGN);
-		pqsignal(SIGFPE, FloatExceptionHandler);
+	/*
+	 * Reset some signals that are accepted by postmaster but not by
+	 * backend
+	 */
+	pqsignal(SIGCHLD, SIG_DFL);		/* system() requires this on some
+									 * platforms */
 
-		/*
-		 * Reset some signals that are accepted by postmaster but not by
-		 * backend
-		 */
-		pqsignal(SIGCHLD, SIG_DFL);		/* system() requires this on some
-										 * platforms */
-	}
+	/* Walsender needs extra signal setup. */
+	if (am_walsender)
+		WalSndSignals();
 
 	pqinitmask();
 
@@ -4060,6 +4058,8 @@ PostgresMain(int argc, char *argv[],
 				{
 					const char *query_string;
 
+					walsender_command_start(firstchar);
+
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
 
@@ -4085,7 +4085,7 @@ PostgresMain(int argc, char *argv[],
 					int			numParams;
 					Oid		   *paramTypes = NULL;
 
-					forbidden_in_wal_sender(firstchar);
+					walsender_command_start(firstchar);
 
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
@@ -4109,7 +4109,7 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'B':			/* bind */
-				forbidden_in_wal_sender(firstchar);
+				walsender_command_start(firstchar);
 
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
@@ -4126,7 +4126,7 @@ PostgresMain(int argc, char *argv[],
 					const char *portal_name;
 					int			max_rows;
 
-					forbidden_in_wal_sender(firstchar);
+					walsender_command_start(firstchar);
 
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
@@ -4140,7 +4140,7 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'F':			/* fastpath function call */
-				forbidden_in_wal_sender(firstchar);
+				walsender_command_start(firstchar);
 
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
@@ -4177,7 +4177,7 @@ PostgresMain(int argc, char *argv[],
 					int			close_type;
 					const char *close_target;
 
-					forbidden_in_wal_sender(firstchar);
+					walsender_command_start(firstchar);
 
 					close_type = pq_getmsgbyte(&input_message);
 					close_target = pq_getmsgstring(&input_message);
@@ -4221,7 +4221,7 @@ PostgresMain(int argc, char *argv[],
 					int			describe_type;
 					const char *describe_target;
 
-					forbidden_in_wal_sender(firstchar);
+					walsender_command_start(firstchar);
 
 					/* Set statement_timestamp() (needed for xact) */
 					SetCurrentStatementStartTimestamp();
@@ -4304,29 +4304,6 @@ PostgresMain(int argc, char *argv[],
 	}							/* end of input-reading loop */
 }
 
-/*
- * Throw an error if we're a WAL sender process.
- *
- * This is used to forbid anything else than simple query protocol messages
- * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
- * message was received, and is used to construct the error message.
- */
-static void
-forbidden_in_wal_sender(char firstchar)
-{
-	if (am_walsender)
-	{
-		if (firstchar == 'F')
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("fastpath function calls not supported in a replication connection")));
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("extended query protocol not supported in a replication connection")));
-	}
-}
-
 
 /*
  * Obtain platform stack depth limit (in bytes)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2ca9038..e5c102c 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -38,6 +38,7 @@ extern int	wal_sender_timeout;
 extern bool log_replication_commands;
 
 extern void InitWalSender(void);
+extern void walsender_command_start(const char firstchar);
 extern bool exec_replication_command(const char *query_string);
 extern void WalSndErrorCleanup(void);
 extern void WalSndSignals(void);
-- 
2.7.4

