From 4570ebf065c92b996eb799d6267dd3c9541b01e5 Mon Sep 17 00:00:00 2001
From: Hari Babu <kommi.haribabu@gmail.com>
Date: Mon, 15 Jan 2018 15:28:57 +1100
Subject: [PATCH] pg_stat_wal_receiver to display remote server info

With the support of multi host connection string support
in PostgreSQL, it is possible for the user to specify the
multi host connection string in recovery.conf to avoid
breakdown of streaming replication.

The pg_stat_wal_receiver view is enhanced to support display
of remote server information from which the replication is
streaming currently with two additional columns remote_server
and remote_port.
---
 doc/src/sgml/monitoring.sgml                       | 19 ++++++++++++++
 src/backend/catalog/system_views.sql               |  2 ++
 .../libpqwalreceiver/libpqwalreceiver.c            | 23 +++++++++++++++++
 src/backend/replication/walreceiver.c              | 29 ++++++++++++++++++++--
 src/include/catalog/pg_proc.h                      |  2 +-
 src/include/replication/walreceiver.h              |  8 ++++++
 src/test/regress/expected/rules.out                |  4 ++-
 7 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3bc4de57d5..1488789be1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2031,6 +2031,25 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry><type>text</type></entry>
      <entry>Replication slot name used by this WAL receiver</entry>
     </row>
+    <row>
+     <entry><structfield>remote_server</structfield></entry>
+     <entry><type>text</type></entry>
+     <entry>
+      Host of the <productname>PostgreSQL</productname> instance
+      this WAL receiver is connected to. This can be a host name,
+      an IP address, or a directory path if the connection is via
+      Unix socket.  (The path case can be distinguished because it
+      will always be an absolute path, beginning with <literal>/</literal>.)
+     </entry>
+    </row>
+    <row>
+     <entry><structfield>remote_port</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>
+      Port number of the <productname>PostgreSQL</productname> instance
+      this WAL receiver is connected to.
+     </entry>
+    </row>
     <row>
      <entry><structfield>conninfo</structfield></entry>
      <entry><type>text</type></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5e6e8a64f6..cdcf3972e8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -752,6 +752,8 @@ CREATE VIEW pg_stat_wal_receiver AS
             s.latest_end_lsn,
             s.latest_end_time,
             s.slot_name,
+            s.remote_server,
+            s.remote_port,
             s.conninfo
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f9aec0531a..d6c2d891a9 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -53,6 +53,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
 				 char **err);
 static void libpqrcv_check_conninfo(const char *conninfo);
 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
+static void libpqrcv_get_remoteserver_info(WalReceiverConn *conn,
+					char **remote_server, int *remote_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 						 TimeLineID *primary_tli,
 						 int *server_version);
@@ -82,6 +84,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
 	libpqrcv_check_conninfo,
 	libpqrcv_get_conninfo,
+	libpqrcv_get_remoteserver_info,
 	libpqrcv_identify_system,
 	libpqrcv_readtimelinehistoryfile,
 	libpqrcv_startstreaming,
@@ -282,6 +285,26 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
 	return retval;
 }
 
+/*
+ * Provides remote sever info.
+ */
+static void
+libpqrcv_get_remoteserver_info(WalReceiverConn *conn, char **remote_server,
+					  int *remote_port)
+{
+	char *ret = NULL;
+
+	Assert(conn->streamConn != NULL);
+
+	ret = PQhost(conn->streamConn);
+	if (ret)
+		*remote_server = pstrdup(ret);
+
+	ret = PQport(conn->streamConn);
+	if (ret)
+		*remote_port = atoi(ret);
+}
+
 /*
  * Check that primary's system identifier matches ours, and fetch the current
  * timeline ID of the primary.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a39a98ff18..df774a47f0 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -52,6 +52,7 @@
 #include "access/xlog_internal.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
+#include "common/ip.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
@@ -199,6 +200,8 @@ WalReceiverMain(void)
 	TimestampTz now;
 	bool		ping_sent;
 	char	   *err;
+	char	   *remote_server = NULL;
+	int			remote_port = 0;
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -311,16 +314,26 @@ WalReceiverMain(void)
 	 * conninfo, for security.
 	 */
 	tmp_conninfo = walrcv_get_conninfo(wrconn);
+	walrcv_get_remoteserver_info(wrconn, &remote_server, &remote_port);
 	SpinLockAcquire(&walrcv->mutex);
 	memset(walrcv->conninfo, 0, MAXCONNINFO);
 	if (tmp_conninfo)
 		strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
+
+	memset(walrcv->remote_server, 0, NI_MAXHOST);
+	if (remote_server)
+		strlcpy((char *) walrcv->remote_server, remote_server, NI_MAXHOST);
+
+	walrcv->remote_port = remote_port;
 	walrcv->ready_to_display = true;
 	SpinLockRelease(&walrcv->mutex);
 
 	if (tmp_conninfo)
 		pfree(tmp_conninfo);
 
+	if (remote_server)
+		pfree(remote_server);
+
 	first_stream = true;
 	for (;;)
 	{
@@ -1402,6 +1415,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	TimestampTz last_receipt_time;
 	XLogRecPtr	latest_end_lsn;
 	TimestampTz latest_end_time;
+	char		remote_server[NI_MAXHOST];
+	int			remote_port = 0;
 	char		slotname[NAMEDATALEN];
 	char		conninfo[MAXCONNINFO];
 
@@ -1419,6 +1434,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	latest_end_lsn = WalRcv->latestWalEnd;
 	latest_end_time = WalRcv->latestWalEndTime;
 	strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
+	strlcpy(remote_server, (char *) WalRcv->remote_server, sizeof(remote_server));
+	remote_port = WalRcv->remote_port;
 	strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
 	SpinLockRelease(&WalRcv->mutex);
 
@@ -1482,10 +1499,18 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 			nulls[10] = true;
 		else
 			values[10] = CStringGetTextDatum(slotname);
-		if (*conninfo == '\0')
+		if (*remote_server == '\0')
 			nulls[11] = true;
 		else
-			values[11] = CStringGetTextDatum(conninfo);
+			values[11] = CStringGetTextDatum(remote_server);
+		if (remote_port == 0)
+			nulls[12] = true;
+		else
+			values[12] = Int32GetDatum(remote_port);
+		if (*conninfo == '\0')
+			nulls[13] = true;
+		else
+			values[13] = CStringGetTextDatum(conninfo);
 	}
 
 	/* Returns the record as Datum */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index bfc90098f8..634eca1b95 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2903,7 +2903,7 @@ DATA(insert OID = 3318 (  pg_stat_get_progress_info			  PGNSP PGUID 12 1 100 0 0
 DESCR("statistics: information about progress of backends running maintenance command");
 DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
-DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
+DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25,23,25}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,remote_server,remote_port,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
 DATA(insert OID = 6118 (  pg_stat_get_subscription	PGNSP PGUID 12 1 0 0 0 f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
 DESCR("statistics: information about subscription");
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index ea7967f6fc..c13ce9413a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -107,6 +107,8 @@ typedef struct
 	 * clobbered to hide security-sensitive fields.
 	 */
 	char		conninfo[MAXCONNINFO];
+	char		remote_server[NI_MAXHOST];
+	int			remote_port;
 
 	/*
 	 * replication slot name; is also used for walreceiver to connect with the
@@ -197,6 +199,9 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica
 											   char **err);
 typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
 typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
+typedef void (*walrcv_get_remoteserver_info_fn) (WalReceiverConn *conn,
+										 char **remote_server,
+										 int *remote_port);
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli,
 											int *server_version);
@@ -227,6 +232,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_connect_fn walrcv_connect;
 	walrcv_check_conninfo_fn walrcv_check_conninfo;
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
+	walrcv_get_remoteserver_info_fn walrcv_get_remoteserver_info;
 	walrcv_identify_system_fn walrcv_identify_system;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
@@ -246,6 +252,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_check_conninfo(conninfo)
 #define walrcv_get_conninfo(conn) \
 	WalReceiverFunctions->walrcv_get_conninfo(conn)
+#define walrcv_get_remoteserver_info(conn, remote_host, remote_port) \
+	WalReceiverFunctions->walrcv_get_remoteserver_info(conn, remote_host, remote_port)
 #define walrcv_identify_system(conn, primary_tli, server_version) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5149b72fe9..e138569f6c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1972,8 +1972,10 @@ pg_stat_wal_receiver| SELECT s.pid,
     s.latest_end_lsn,
     s.latest_end_time,
     s.slot_name,
+    s.remote_server,
+    s.remote_port,
     s.conninfo
-   FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, conninfo)
+   FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, remote_server, remote_port, conninfo)
   WHERE (s.pid IS NOT NULL);
 pg_stat_xact_all_tables| SELECT c.oid AS relid,
     n.nspname AS schemaname,
-- 
2.16.1.windows.4

