I have taken the libpqwalreceiver refactoring patch and split it into
two: one for the latch change, one for the API change.  I have done some
mild editing.

These two patches are now ready to commit in my mind.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From dc95826ff5018be1f001bead888b16ea3effe099 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Wed, 30 Nov 2016 12:00:00 -0500
Subject: [PATCH 1/2] Use latch instead of select() in walreceiver

Replace use of poll()/select() by WaitLatchOrSocket(), which is more
portable and flexible.

Also change walreceiver to use its procLatch instead of a custom latch.

From: Petr Jelinek <p...@2ndquadrant.com>
---
 src/backend/postmaster/pgstat.c                    |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c            | 101 +++++----------------
 src/backend/replication/walreceiver.c              |  18 ++--
 src/backend/replication/walreceiverfuncs.c         |   6 +-
 src/include/pgstat.h                               |   1 +
 src/include/replication/walreceiver.h              |   3 +-
 6 files changed, 43 insertions(+), 89 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a392197..c7584cb 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
 		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
 			event_name = "WalReceiverWaitStart";
 			break;
+		case WAIT_EVENT_LIBPQWALRECEIVER_READ:
+			event_name = "LibPQWalReceiverRead";
+			break;
 		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
 			event_name = "WalSenderWaitForWAL";
 			break;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f1c843e..6c01e7b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,19 +23,11 @@
 #include "pqexpbuffer.h"
 #include "access/xlog.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/walreceiver.h"
+#include "storage/proc.h"
 #include "utils/builtins.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
 PG_MODULE_MAGIC;
 
 void		_PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
 static PGresult *libpqrcv_PQexec(const char *query);
 
 /*
@@ -367,67 +358,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
 }
 
 /*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
-	int			ret;
-
-	Assert(streamConn != NULL);
-	if (PQsocket(streamConn) < 0)
-		ereport(ERROR,
-				(errcode_for_socket_access(),
-				 errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
-	/* We use poll(2) if available, otherwise select(2) */
-	{
-#ifdef HAVE_POLL
-		struct pollfd input_fd;
-
-		input_fd.fd = PQsocket(streamConn);
-		input_fd.events = POLLIN | POLLERR;
-		input_fd.revents = 0;
-
-		ret = poll(&input_fd, 1, timeout_ms);
-#else							/* !HAVE_POLL */
-
-		fd_set		input_mask;
-		struct timeval timeout;
-		struct timeval *ptr_timeout;
-
-		FD_ZERO(&input_mask);
-		FD_SET(PQsocket(streamConn), &input_mask);
-
-		if (timeout_ms < 0)
-			ptr_timeout = NULL;
-		else
-		{
-			timeout.tv_sec = timeout_ms / 1000;
-			timeout.tv_usec = (timeout_ms % 1000) * 1000;
-			ptr_timeout = &timeout;
-		}
-
-		ret = select(PQsocket(streamConn) + 1, &input_mask,
-					 NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-	}
-
-	if (ret == 0 || (ret < 0 && errno == EINTR))
-		return false;
-	if (ret < 0)
-		ereport(ERROR,
-				(errcode_for_socket_access(),
-				 errmsg("select() failed: %m")));
-	return true;
-}
-
-/*
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and the backend version of select().
  *
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
 		 */
 		while (PQisBusy(streamConn))
 		{
+			int			rc;
+
 			/*
 			 * We don't need to break down the sleep into smaller increments,
-			 * and check for interrupts after each nap, since we can just
-			 * elog(FATAL) within SIGTERM signal handler if the signal arrives
-			 * in the middle of establishment of replication connection.
+			 * since we'll get interrupted by signals and can either handle
+			 * interrupts here or elog(FATAL) within SIGTERM signal handler if
+			 * the signal arrives in the middle of establishment of
+			 * replication connection.
 			 */
-			if (!libpq_select(-1))
-				continue;		/* interrupted */
+			ResetLatch(&MyProc->procLatch);
+			rc = WaitLatchOrSocket(&MyProc->procLatch,
+								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+								   WL_LATCH_SET,
+								   PQsocket(streamConn),
+								   0,
+								   WAIT_EVENT_LIBPQWALRECEIVER_READ);
+			if (rc & WL_POSTMASTER_DEATH)
+				exit(1);
+
+			/* interrupted */
+			if (rc & WL_LATCH_SET)
+			{
+				CHECK_FOR_INTERRUPTS();
+				continue;
+			}
 			if (PQconsumeInput(streamConn) == 0)
 				return NULL;	/* trouble */
 		}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce..8bfb041 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -261,7 +261,7 @@ WalReceiverMain(void)
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
-	OwnLatch(&walrcv->latch);
+	walrcv->latch = &MyProc->procLatch;
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, WalRcvSigHupHandler);		/* set flag to read config
@@ -483,7 +483,7 @@ WalReceiverMain(void)
 				 * avoiding some system calls.
 				 */
 				Assert(wait_fd != PGINVALID_SOCKET);
-				rc = WaitLatchOrSocket(&walrcv->latch,
+				rc = WaitLatchOrSocket(walrcv->latch,
 								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
 									   WL_TIMEOUT | WL_LATCH_SET,
 									   wait_fd,
@@ -491,7 +491,7 @@ WalReceiverMain(void)
 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
 				if (rc & WL_LATCH_SET)
 				{
-					ResetLatch(&walrcv->latch);
+					ResetLatch(walrcv->latch);
 					if (walrcv->force_reply)
 					{
 						/*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	WakeupRecovery();
 	for (;;)
 	{
-		ResetLatch(&walrcv->latch);
+		ResetLatch(walrcv->latch);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
-		WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+		WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
 				  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
 	}
 
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
 	/* Ensure that all WAL records received are flushed to disk */
 	XLogWalRcvFlush(true);
 
-	DisownLatch(&walrcv->latch);
+	walrcv->latch = NULL;
 
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 
 	got_SIGTERM = true;
 
-	SetLatch(&WalRcv->latch);
+	if (WalRcv->latch)
+		SetLatch(WalRcv->latch);
 
 	/* Don't joggle the elbow of proc_exit */
 	if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
 WalRcvForceReply(void)
 {
 	WalRcv->force_reply = true;
-	SetLatch(&WalRcv->latch);
+	if (WalRcv->latch)
+		SetLatch(WalRcv->latch);
 }
 
 /*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5f6e423..01111a4 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
 		MemSet(WalRcv, 0, WalRcvShmemSize());
 		WalRcv->walRcvState = WALRCV_STOPPED;
 		SpinLockInit(&WalRcv->mutex);
-		InitSharedLatch(&WalRcv->latch);
+		WalRcv->latch = NULL;
 	}
 }
 
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	if (launch)
 		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-	else
-		SetLatch(&walrcv->latch);
+	else if (walrcv->latch)
+		SetLatch(walrcv->latch);
 }
 
 /*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0b85b7a..152ff06 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -763,6 +763,7 @@ typedef enum
 	WAIT_EVENT_CLIENT_WRITE,
 	WAIT_EVENT_SSL_OPEN_SERVER,
 	WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+	WAIT_EVENT_LIBPQWALRECEIVER_READ,
 	WAIT_EVENT_WAL_SENDER_WAIT_WAL,
 	WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cd787c9..afbb8d8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -127,8 +127,9 @@ typedef struct
 	 * where to start streaming (after setting receiveStart and
 	 * receiveStartTLI), and also to tell it to send apply feedback to the
 	 * primary whenever specially marked commit records are applied.
+	 * This is normally mapped to procLatch when walreceiver is running.
 	 */
-	Latch		latch;
+	Latch	   *latch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
-- 
2.10.2

>From 32546a13cc136905a378f59ca0b09c25e2668e8f Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Wed, 30 Nov 2016 12:00:00 -0500
Subject: [PATCH 2/2] Refactor libpqwalreceiver

The whole walreceiver API is now wrapped into a struct, like most of our
other loadable module APIs.  The libpq connection is no longer a global
variable in libpqwalreceiver.  Instead, it is encapsulated into a struct
that is passed around the functions.  This allows multiple walreceivers
to run at the same time.

Add some rudimentary support for logical replication connections to
libpqwalreceiver.

These changes are mostly cosmetic and are going to be useful for the
future logical replication patches.

From: Petr Jelinek <p...@2ndquadrant.com>
---
 .../libpqwalreceiver/libpqwalreceiver.c            | 266 ++++++++++++---------
 src/backend/replication/walreceiver.c              |  59 +++--
 src/include/replication/walreceiver.h              |  83 +++++--
 3 files changed, 235 insertions(+), 173 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6c01e7b..c3b0bf5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -32,59 +32,72 @@ PG_MODULE_MAGIC;
 
 void		_PG_init(void);
 
-/* Current connection to the primary, if any */
-static PGconn *streamConn = NULL;
-
-/* Buffer for currently read records */
-static char *recvBuf = NULL;
+struct WalReceiverConn
+{
+	/* Current connection to the primary, if any */
+	PGconn *streamConn;
+	/* Used to remember if the connection is logical or physical */
+	bool	logical;
+	/* Buffer for currently read records */
+	char   *recvBuf;
+};
 
 /* Prototypes for interface functions */
-static void libpqrcv_connect(char *conninfo);
-static char *libpqrcv_get_conninfo(void);
-static void libpqrcv_identify_system(TimeLineID *primary_tli);
-static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
-static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
-						char *slotname);
-static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int	libpqrcv_receive(char **buffer, pgsocket *wait_fd);
-static void libpqrcv_send(const char *buffer, int nbytes);
-static void libpqrcv_disconnect(void);
+static WalReceiverConn *libpqrcv_connect(const char *conninfo,
+										 bool logical, const char *appname);
+static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
+static char *libpqrcv_identify_system(WalReceiverConn *conn,
+									  TimeLineID *primary_tli);
+static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
+								 TimeLineID tli, char **filename,
+								 char **content, int *len);
+static bool libpqrcv_startstreaming(WalReceiverConn *conn,
+						TimeLineID tli, XLogRecPtr startpoint,
+						const char *slotname);
+static void libpqrcv_endstreaming(WalReceiverConn *conn,
+								  TimeLineID *next_tli);
+static int	libpqrcv_receive(WalReceiverConn *conn, char **buffer,
+							 pgsocket *wait_fd);
+static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
+						  int nbytes);
+static void libpqrcv_disconnect(WalReceiverConn *conn);
+
+static WalReceiverFunctionsType PQWalReceiverFunctions = {
+	libpqrcv_connect,
+	libpqrcv_get_conninfo,
+	libpqrcv_identify_system,
+	libpqrcv_readtimelinehistoryfile,
+	libpqrcv_startstreaming,
+	libpqrcv_endstreaming,
+	libpqrcv_receive,
+	libpqrcv_send,
+	libpqrcv_disconnect
+};
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(const char *query);
+static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
 
 /*
- * Module load callback
+ * Module initialization function
  */
 void
 _PG_init(void)
 {
-	/* Tell walreceiver how to reach us */
-	if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
-		walrcv_readtimelinehistoryfile != NULL ||
-		walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
-		walrcv_receive != NULL || walrcv_send != NULL ||
-		walrcv_disconnect != NULL)
+	if (WalReceiverFunctions != NULL)
 		elog(ERROR, "libpqwalreceiver already loaded");
-	walrcv_connect = libpqrcv_connect;
-	walrcv_get_conninfo = libpqrcv_get_conninfo;
-	walrcv_identify_system = libpqrcv_identify_system;
-	walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
-	walrcv_startstreaming = libpqrcv_startstreaming;
-	walrcv_endstreaming = libpqrcv_endstreaming;
-	walrcv_receive = libpqrcv_receive;
-	walrcv_send = libpqrcv_send;
-	walrcv_disconnect = libpqrcv_disconnect;
+	WalReceiverFunctions = &PQWalReceiverFunctions;
 }
 
 /*
  * Establish the connection to the primary server for XLOG streaming
  */
-static void
-libpqrcv_connect(char *conninfo)
+static WalReceiverConn *
+libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
 {
+	WalReceiverConn *conn;
 	const char *keys[5];
 	const char *vals[5];
+	int			i = 0;
 
 	/*
 	 * We use the expand_dbname parameter to process the connection string (or
@@ -93,22 +106,29 @@ libpqrcv_connect(char *conninfo)
 	 * database name is ignored by the server in replication mode, but specify
 	 * "replication" for .pgpass lookup.
 	 */
-	keys[0] = "dbname";
-	vals[0] = conninfo;
-	keys[1] = "replication";
-	vals[1] = "true";
-	keys[2] = "dbname";
-	vals[2] = "replication";
-	keys[3] = "fallback_application_name";
-	vals[3] = "walreceiver";
-	keys[4] = NULL;
-	vals[4] = NULL;
-
-	streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
-	if (PQstatus(streamConn) != CONNECTION_OK)
+	keys[i] = "dbname";
+	vals[i] = conninfo;
+	keys[++i] = "replication";
+	vals[i] = logical ? "database" : "true";
+	if (!logical)
+	{
+		keys[++i] = "dbname";
+		vals[i] = "replication";
+	}
+	keys[++i] = "fallback_application_name";
+	vals[i] = appname;
+	keys[++i] = NULL;
+	vals[i] = NULL;
+
+	conn = palloc0(sizeof(WalReceiverConn));
+	conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		ereport(ERROR,
 				(errmsg("could not connect to the primary server: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
+	conn->logical = logical;
+
+	return conn;
 }
 
 /*
@@ -116,17 +136,17 @@ libpqrcv_connect(char *conninfo)
  * are obfuscated.
  */
 static char *
-libpqrcv_get_conninfo(void)
+libpqrcv_get_conninfo(WalReceiverConn *conn)
 {
 	PQconninfoOption *conn_opts;
 	PQconninfoOption *conn_opt;
 	PQExpBufferData buf;
 	char	   *retval;
 
-	Assert(streamConn != NULL);
+	Assert(conn->streamConn != NULL);
 
 	initPQExpBuffer(&buf);
-	conn_opts = PQconninfo(streamConn);
+	conn_opts = PQconninfo(conn->streamConn);
 
 	if (conn_opts == NULL)
 		ereport(ERROR,
@@ -164,25 +184,24 @@ libpqrcv_get_conninfo(void)
  * Check that primary's system identifier matches ours, and fetch the current
  * timeline ID of the primary.
  */
-static void
-libpqrcv_identify_system(TimeLineID *primary_tli)
+static char *
+libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 {
 	PGresult   *res;
 	char	   *primary_sysid;
-	char		standby_sysid[32];
 
 	/*
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
+	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("could not receive database system identifier and timeline ID from "
 						"the primary server: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 	}
 	if (PQnfields(res) < 3 || PQntuples(res) != 1)
 	{
@@ -195,24 +214,11 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 				 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
 						   ntuples, nfields, 3, 1)));
 	}
-	primary_sysid = PQgetvalue(res, 0, 0);
+	primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
 	*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
-
-	/*
-	 * Confirm that the system identifier of the primary is the same as ours.
-	 */
-	snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
-			 GetSystemIdentifier());
-	if (strcmp(primary_sysid, standby_sysid) != 0)
-	{
-		primary_sysid = pstrdup(primary_sysid);
-		PQclear(res);
-		ereport(ERROR,
-				(errmsg("database system identifier differs between the primary and standby"),
-				 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
-						   primary_sysid, standby_sysid)));
-	}
 	PQclear(res);
+
+	return primary_sysid;
 }
 
 /*
@@ -226,21 +232,30 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
  * throws an ERROR.
  */
 static bool
-libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
+libpqrcv_startstreaming(WalReceiverConn *conn,
+						TimeLineID tli, XLogRecPtr startpoint,
+						const char *slotname)
 {
-	char		cmd[256];
+	StringInfoData cmd;
 	PGresult   *res;
 
+	Assert(!conn->logical);
+
+	initStringInfo(&cmd);
+
 	/* Start streaming from the point requested by startup process */
 	if (slotname != NULL)
-		snprintf(cmd, sizeof(cmd),
-				 "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", slotname,
-				 (uint32) (startpoint >> 32), (uint32) startpoint, tli);
+		appendStringInfo(&cmd,
+						 "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
+						 slotname,
+						 (uint32) (startpoint >> 32), (uint32) startpoint,
+						 tli);
 	else
-		snprintf(cmd, sizeof(cmd),
-				 "START_REPLICATION %X/%X TIMELINE %u",
-				 (uint32) (startpoint >> 32), (uint32) startpoint, tli);
-	res = libpqrcv_PQexec(cmd);
+		appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
+						 (uint32) (startpoint >> 32), (uint32) startpoint,
+						 tli);
+	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
 	{
@@ -252,7 +267,7 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("could not start WAL streaming: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 	}
 	PQclear(res);
 	return true;
@@ -263,14 +278,17 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
  * reported by the server, or 0 if it did not report it.
  */
 static void
-libpqrcv_endstreaming(TimeLineID *next_tli)
+libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 {
 	PGresult   *res;
 
-	if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
+	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
+		PQflush(conn->streamConn))
 		ereport(ERROR,
 			(errmsg("could not send end-of-streaming message to primary: %s",
-					PQerrorMessage(streamConn))));
+					PQerrorMessage(conn->streamConn))));
+
+	*next_tli = 0;
 
 	/*
 	 * After COPY is finished, we should receive a result set indicating the
@@ -282,7 +300,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
 	 * called after receiving CopyDone from the backend - the walreceiver
 	 * never terminates replication on its own initiative.
 	 */
-	res = PQgetResult(streamConn);
+	res = PQgetResult(conn->streamConn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -296,47 +314,58 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = PQgetResult(streamConn);
+		res = PQgetResult(conn->streamConn);
+	}
+	else if (PQresultStatus(res) == PGRES_COPY_OUT)
+	{
+		PQclear(res);
+
+		/* End the copy */
+		PQendcopy(conn->streamConn);
+
+		/* CommandComplete should follow */
+		res = PQgetResult(conn->streamConn);
 	}
-	else
-		*next_tli = 0;
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		ereport(ERROR,
 				(errmsg("error reading result of streaming command: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = PQgetResult(streamConn);
+	res = PQgetResult(conn->streamConn);
 	if (res != NULL)
 		ereport(ERROR,
 				(errmsg("unexpected result after CommandComplete: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 }
 
 /*
  * Fetch the timeline history file for 'tli' from primary.
  */
 static void
-libpqrcv_readtimelinehistoryfile(TimeLineID tli,
-								 char **filename, char **content, int *len)
+libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
+								 TimeLineID tli, char **filename,
+								 char **content, int *len)
 {
 	PGresult   *res;
 	char		cmd[64];
 
+	Assert(!conn->logical);
+
 	/*
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(cmd);
+	res = libpqrcv_PQexec(conn->streamConn, cmd);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("could not receive timeline history file from "
 						"the primary server: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 	}
 	if (PQnfields(res) != 2 || PQntuples(res) != 1)
 	{
@@ -374,7 +403,7 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
  * Queries are always executed on the connection in streamConn.
  */
 static PGresult *
-libpqrcv_PQexec(const char *query)
+libpqrcv_PQexec(PGconn *streamConn, const char *query)
 {
 	PGresult   *result = NULL;
 	PGresult   *lastResult = NULL;
@@ -455,10 +484,12 @@ libpqrcv_PQexec(const char *query)
  * Disconnect connection to primary, if any.
  */
 static void
-libpqrcv_disconnect(void)
+libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(streamConn);
-	streamConn = NULL;
+	PQfinish(conn->streamConn);
+	if (conn->recvBuf != NULL)
+		PQfreemem(conn->recvBuf);
+	pfree(conn);
 }
 
 /*
@@ -478,30 +509,31 @@ libpqrcv_disconnect(void)
  * ereports on error.
  */
 static int
-libpqrcv_receive(char **buffer, pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer,
+				 pgsocket *wait_fd)
 {
 	int			rawlen;
 
-	if (recvBuf != NULL)
-		PQfreemem(recvBuf);
-	recvBuf = NULL;
+	if (conn->recvBuf != NULL)
+		PQfreemem(conn->recvBuf);
+	conn->recvBuf = NULL;
 
 	/* Try to receive a CopyData message */
-	rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+	rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
 	if (rawlen == 0)
 	{
 		/* Try consuming some data. */
-		if (PQconsumeInput(streamConn) == 0)
+		if (PQconsumeInput(conn->streamConn) == 0)
 			ereport(ERROR,
 					(errmsg("could not receive data from WAL stream: %s",
-							PQerrorMessage(streamConn))));
+							PQerrorMessage(conn->streamConn))));
 
 		/* Now that we've consumed some input, try again */
-		rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+		rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
 		if (rawlen == 0)
 		{
 			/* Tell caller to try again when our socket is ready. */
-			*wait_fd = PQsocket(streamConn);
+			*wait_fd = PQsocket(conn->streamConn);
 			return 0;
 		}
 	}
@@ -509,7 +541,7 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
 	{
 		PGresult   *res;
 
-		res = PQgetResult(streamConn);
+		res = PQgetResult(conn->streamConn);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK ||
 			PQresultStatus(res) == PGRES_COPY_IN)
 		{
@@ -521,16 +553,16 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
 			PQclear(res);
 			ereport(ERROR,
 					(errmsg("could not receive data from WAL stream: %s",
-							PQerrorMessage(streamConn))));
+							PQerrorMessage(conn->streamConn))));
 		}
 	}
 	if (rawlen < -1)
 		ereport(ERROR,
 				(errmsg("could not receive data from WAL stream: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 
 	/* Return received messages to caller */
-	*buffer = recvBuf;
+	*buffer = conn->recvBuf;
 	return rawlen;
 }
 
@@ -540,11 +572,11 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
  * ereports on error.
  */
 static void
-libpqrcv_send(const char *buffer, int nbytes)
+libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
 {
-	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
-		PQflush(streamConn))
+	if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
+		PQflush(conn->streamConn))
 		ereport(ERROR,
 				(errmsg("could not send data to WAL stream: %s",
-						PQerrorMessage(streamConn))));
+						PQerrorMessage(conn->streamConn))));
 }
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8bfb041..cc3cf7d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -74,16 +74,9 @@ int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
 
-/* libpqreceiver hooks to these when loaded */
-walrcv_connect_type walrcv_connect = NULL;
-walrcv_get_conninfo_type walrcv_get_conninfo = NULL;
-walrcv_identify_system_type walrcv_identify_system = NULL;
-walrcv_startstreaming_type walrcv_startstreaming = NULL;
-walrcv_endstreaming_type walrcv_endstreaming = NULL;
-walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
-walrcv_receive_type walrcv_receive = NULL;
-walrcv_send_type walrcv_send = NULL;
-walrcv_disconnect_type walrcv_disconnect = NULL;
+/* libpqwalreceiver connection */
+static WalReceiverConn *wrconn = NULL;
+WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
 
@@ -286,14 +279,7 @@ WalReceiverMain(void)
 
 	/* Load the libpq-specific functions */
 	load_file("libpqwalreceiver", false);
-	if (walrcv_connect == NULL ||
-		walrcv_get_conninfo == NULL ||
-		walrcv_startstreaming == NULL ||
-		walrcv_endstreaming == NULL ||
-		walrcv_identify_system == NULL ||
-		walrcv_readtimelinehistoryfile == NULL ||
-		walrcv_receive == NULL || walrcv_send == NULL ||
-		walrcv_disconnect == NULL)
+	if (WalReceiverFunctions == NULL)
 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
 	/*
@@ -307,14 +293,14 @@ WalReceiverMain(void)
 
 	/* Establish the connection to the primary for XLOG streaming */
 	EnableWalRcvImmediateExit();
-	walrcv_connect(conninfo);
+	wrconn = walrcv_connect(conninfo, false, "walreceiver");
 	DisableWalRcvImmediateExit();
 
 	/*
 	 * Save user-visible connection string.  This clobbers the original
 	 * conninfo, for security.
 	 */
-	tmp_conninfo = walrcv_get_conninfo();
+	tmp_conninfo = walrcv_get_conninfo(wrconn);
 	SpinLockAcquire(&walrcv->mutex);
 	memset(walrcv->conninfo, 0, MAXCONNINFO);
 	if (tmp_conninfo)
@@ -328,12 +314,25 @@ WalReceiverMain(void)
 	first_stream = true;
 	for (;;)
 	{
+		char	   *primary_sysid;
+		char		standby_sysid[32];
+
 		/*
 		 * Check that we're connected to a valid server using the
 		 * IDENTIFY_SYSTEM replication command,
 		 */
 		EnableWalRcvImmediateExit();
-		walrcv_identify_system(&primaryTLI);
+		primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
+
+		snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+				 GetSystemIdentifier());
+		if (strcmp(primary_sysid, standby_sysid) != 0)
+		{
+			ereport(ERROR,
+					(errmsg("database system identifier differs between the primary and standby"),
+					 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
+							   primary_sysid, standby_sysid)));
+		}
 		DisableWalRcvImmediateExit();
 
 		/*
@@ -370,7 +369,7 @@ WalReceiverMain(void)
 		 * on the new timeline.
 		 */
 		ThisTimeLineID = startpointTLI;
-		if (walrcv_startstreaming(startpointTLI, startpoint,
+		if (walrcv_startstreaming(wrconn, startpointTLI, startpoint,
 								  slotname[0] != '\0' ? slotname : NULL))
 		{
 			if (first_stream)
@@ -422,7 +421,7 @@ WalReceiverMain(void)
 				}
 
 				/* See if we can read data immediately */
-				len = walrcv_receive(&buf, &wait_fd);
+				len = walrcv_receive(wrconn, &buf, &wait_fd);
 				if (len != 0)
 				{
 					/*
@@ -453,7 +452,7 @@ WalReceiverMain(void)
 							endofwal = true;
 							break;
 						}
-						len = walrcv_receive(&buf, &wait_fd);
+						len = walrcv_receive(wrconn, &buf, &wait_fd);
 					}
 
 					/* Let the master know that we received some data. */
@@ -570,7 +569,7 @@ WalReceiverMain(void)
 			 * our side, too.
 			 */
 			EnableWalRcvImmediateExit();
-			walrcv_endstreaming(&primaryTLI);
+			walrcv_endstreaming(wrconn, &primaryTLI);
 			DisableWalRcvImmediateExit();
 
 			/*
@@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
 							tli)));
 
 			EnableWalRcvImmediateExit();
-			walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+			walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
 			DisableWalRcvImmediateExit();
 
 			/*
@@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg)
 	SpinLockRelease(&walrcv->mutex);
 
 	/* Terminate the connection gracefully. */
-	if (walrcv_disconnect != NULL)
-		walrcv_disconnect();
+	if (wrconn != NULL)
+		walrcv_disconnect(wrconn);
 
 	/* Wake up the startup process to notice promptly that we're gone */
 	WakeupRecovery();
@@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
 		 requestReply ? " (reply requested)" : "");
 
-	walrcv_send(reply_message.data, reply_message.len);
+	walrcv_send(wrconn, reply_message.data, reply_message.len);
 }
 
 /*
@@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
 	pq_sendint(&reply_message, nextEpoch, 4);
-	walrcv_send(reply_message.data, reply_message.len);
+	walrcv_send(wrconn, reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
 	else
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index afbb8d8..edb14b5 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -134,33 +134,64 @@ typedef struct
 
 extern WalRcvData *WalRcv;
 
-/* libpqwalreceiver hooks */
-typedef void (*walrcv_connect_type) (char *conninfo);
-extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
-
-typedef char *(*walrcv_get_conninfo_type) (void);
-extern PGDLLIMPORT walrcv_get_conninfo_type walrcv_get_conninfo;
-
-typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
-extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
-
-typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
-extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
-
-typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
-extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
+struct WalReceiverConn;
+typedef struct WalReceiverConn WalReceiverConn;
 
-typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
-extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
-
-typedef int (*walrcv_receive_type) (char **buffer, pgsocket *wait_fd);
-extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
-
-typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
-extern PGDLLIMPORT walrcv_send_type walrcv_send;
-
-typedef void (*walrcv_disconnect_type) (void);
-extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
+/* libpqwalreceiver hooks */
+typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
+											   const char *appname);
+typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
+typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
+											TimeLineID *primary_tli);
+typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
+												   TimeLineID tli,
+												   char **filename,
+												   char **content, int *size);
+typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
+										  TimeLineID tli,
+										  XLogRecPtr startpoint,
+										  const char *slotname);
+typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
+										TimeLineID *next_tli);
+typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
+								  pgsocket *wait_fd);
+typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
+								int nbytes);
+typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+
+typedef struct WalReceiverFunctionsType
+{
+	walrcv_connect_fn					connect;
+	walrcv_get_conninfo_fn				get_conninfo;
+	walrcv_identify_system_fn			identify_system;
+	walrcv_readtimelinehistoryfile_fn	readtimelinehistoryfile;
+	walrcv_startstreaming_fn			startstreaming;
+	walrcv_endstreaming_fn				endstreaming;
+	walrcv_receive_fn					receive;
+	walrcv_send_fn						send;
+	walrcv_disconnect_fn				disconnect;
+} WalReceiverFunctionsType;
+
+extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
+
+#define walrcv_connect(conninfo, logical, appname) \
+	WalReceiverFunctions->connect(conninfo, logical, appname)
+#define walrcv_get_conninfo(conn) \
+	WalReceiverFunctions->get_conninfo(conn)
+#define walrcv_identify_system(conn, primary_tli) \
+	WalReceiverFunctions->identify_system(conn, primary_tli)
+#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
+	WalReceiverFunctions->readtimelinehistoryfile(conn, tli, filename, content, size)
+#define walrcv_startstreaming(conn, tli, startpoint, slotname) \
+	WalReceiverFunctions->startstreaming(conn, tli, startpoint, slotname)
+#define walrcv_endstreaming(conn, next_tli) \
+	WalReceiverFunctions->endstreaming(conn, next_tli)
+#define walrcv_receive(conn, buffer, wait_fd) \
+	WalReceiverFunctions->receive(conn, buffer, wait_fd)
+#define walrcv_send(conn, buffer, nbytes) \
+	WalReceiverFunctions->send(conn, buffer, nbytes)
+#define walrcv_disconnect(conn) \
+	WalReceiverFunctions->disconnect(conn)
 
 /* prototypes for functions in walreceiver.c */
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-- 
2.10.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to