On Mon, Nov 3, 2025 at 3:09 PM Jakub Wartak
<[email protected]> wrote:
>
> Attached is pg_stat_tcpinfo, an heavy work in progress, Linux-only
> netstat/ss-like extension for showing detailed information about TCP
> connections based on information from the kernel itself.
[..]

> Some early feedback about direction in order to bring this into core
> would be appreciated. State of stuff:
>
> 1. Andres is pushing for supporting UNIX domain sockets here, but I'm
> not sure if it is really worth the effort (and it would trigger new
> naming problem;)) and primarily making the code even more complex.
> IMHO the netlinksock_diag API is already convoluted and adding AF_UNIX
> would make it even less readable.
> 2. IPv6 works, but wasn't tested much.
> 3. Biggest TODO left is probably properly formatting the information
> based on struct tcpinfo variables (just like ss(1) does, so keeping
> the same unit/formatting)
> 4. Patch/tests are missing intentionally as I would like first to
> stabilize the outputs/naming/code first.
> 5. [security] Should this be available to pg_monitor/pg_read_all_stats
> or just to superuser?
> 6. [security] Should this return info about all TCP connections or
> just the UID of the postmaster?

v2 attached  with tiny fixes and little more code readability (for
dumping struct tcpinfo *)

-J.
From 9994e60137cec47cfca5f97e888f7f1249caf7e0 Mon Sep 17 00:00:00 2001
From: Jakub Wartak <[email protected]>
Date: Mon, 3 Nov 2025 14:57:52 +0100
Subject: [PATCH v2] pg_stat_tcpinfo

---
 contrib/Makefile                              |    1 +
 contrib/meson.build                           |    1 +
 contrib/pg_stat_tcpinfo/Makefile              |   21 +
 contrib/pg_stat_tcpinfo/meson.build           |   25 +
 .../pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql  |   29 +
 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c     | 1076 +++++++++++++++++
 .../pg_stat_tcpinfo/pg_stat_tcpinfo.control   |    5 +
 7 files changed, 1158 insertions(+)
 create mode 100644 contrib/pg_stat_tcpinfo/Makefile
 create mode 100644 contrib/pg_stat_tcpinfo/meson.build
 create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql
 create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c
 create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control

diff --git a/contrib/Makefile b/contrib/Makefile
index 2f0a88d3f77..8060518f118 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -36,6 +36,7 @@ SUBDIRS = \
 		pg_overexplain \
 		pg_prewarm	\
 		pg_stat_statements \
+		pg_stat_tcpinfo \
 		pg_surgery	\
 		pg_trgm		\
 		pgrowlocks	\
diff --git a/contrib/meson.build b/contrib/meson.build
index ed30ee7d639..f37d26ccf40 100644
--- a/contrib/meson.build
+++ b/contrib/meson.build
@@ -51,6 +51,7 @@ subdir('pg_overexplain')
 subdir('pg_prewarm')
 subdir('pgrowlocks')
 subdir('pg_stat_statements')
+subdir('pg_stat_tcpinfo')
 subdir('pgstattuple')
 subdir('pg_surgery')
 subdir('pg_trgm')
diff --git a/contrib/pg_stat_tcpinfo/Makefile b/contrib/pg_stat_tcpinfo/Makefile
new file mode 100644
index 00000000000..aec1111ce54
--- /dev/null
+++ b/contrib/pg_stat_tcpinfo/Makefile
@@ -0,0 +1,21 @@
+# contrib/pg_stat_tcpinfo/Makefile
+
+MODULE_big = pg_stat_tcpinfo
+OBJS = pg_stat_tcpinfo.o
+
+EXTENSION = pg_stat_tcpinfo
+DATA = pg_stat_tcpinfo--1.0.sql
+PGFILEDESC = "pg_stat_tcpinfo - show detailed TCP connection info on Linux.'
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_stat_tcpinfo
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_stat_tcpinfo/meson.build b/contrib/pg_stat_tcpinfo/meson.build
new file mode 100644
index 00000000000..b78a5e25ec4
--- /dev/null
+++ b/contrib/pg_stat_tcpinfo/meson.build
@@ -0,0 +1,25 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+pg_stat_tcpinfo_sources = files(
+  'pg_stat_tcpinfo.c',
+)
+
+pg_stat_tcpinfo = shared_module('pg_stat_tcpinfo',
+  pg_stat_tcpinfo_sources,
+  kwargs: contrib_mod_args + {
+    'dependencies': contrib_mod_args['dependencies'],
+  },
+)
+contrib_targets += pg_stat_tcpinfo
+
+install_data(
+  'pg_stat_tcpinfo.control',
+  'pg_stat_tcpinfo--1.0.sql',
+  kwargs: contrib_data_args,
+)
+
+tests += {
+  'name': 'pg_stat_tcpinfo',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+}
diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql
new file mode 100644
index 00000000000..83b18225a0d
--- /dev/null
+++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql
@@ -0,0 +1,29 @@
+-- tcpinfo--1.0.sql
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_stat_tcpinfo" to load this file. \quit
+
+CREATE FUNCTION pg_stat_get_tcpinfo(
+    OUT pid integer,
+    OUT uid integer,
+    OUT src_addr inet,
+    OUT src_port integer,
+    OUT dst_addr inet,
+    OUT dst_port integer,
+    OUT state text,
+    OUT recvq integer,
+    OUT sendq integer,
+    OUT tcpinfo jsonb
+)
+RETURNS SETOF record
+AS '$libdir/pg_stat_tcpinfo', 'pg_stat_get_tcpinfo'
+LANGUAGE C STRICT VOLATILE;
+
+COMMENT ON FUNCTION pg_stat_get_tcpinfo()
+IS 'Shows detailed TCP connection information on Linux.';
+
+CREATE VIEW pg_stat_tcpinfo AS SELECT * FROM pg_stat_get_tcpinfo();
+
+GRANT EXECUTE ON FUNCTION pg_stat_get_tcpinfo() TO pg_monitor;
+GRANT EXECUTE ON FUNCTION pg_stat_get_tcpinfo() TO pg_read_all_stats;
+GRANT SELECT ON pg_stat_tcpinfo TO pg_monitor;
+GRANT SELECT ON pg_stat_tcpinfo TO pg_read_all_stats;
diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c
new file mode 100644
index 00000000000..2ec04d77e32
--- /dev/null
+++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c
@@ -0,0 +1,1076 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_stat_tcpinfo.c
+ *		A netstat/ss-like Linux-only function and view for PostgreSQL.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * It works in three main parts:
+ * 1. Scans /proc/net/tcp* to get a list of all active TCP sockets and their
+ *    'inode' numbers.
+ * 2. Scans the /proc filesystem (all /proc/[PID]/fd/ directories)
+ *    to build a map of which PID owns which socket inode (by reading symlink).
+ * 3. Queries the netlink INET_DIAG interface to get detailed
+ *    TCP info (like RTT, skmem, timers, congestion algorithm used) for all
+ *    connections.
+ * 4. Joins these three pieces of information and returns them as a set of rows.
+ *
+ * This function must be run by a user with sufficient permissions
+ * (e.g., as part of the 'postgres' superuser) and the PostgreSQL
+ * server process itself must have permissions to read the
+ * /proc/[PID]/fd directories of processes owned by other users.
+ * Without such permissions, the 'pid' column will be NULL for most connections.
+ */
+
+#include "c.h"
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "funcapi.h"
+#include "utils/builtins.h"
+#include "utils/hsearch.h"
+#include "lib/stringinfo.h"
+#include "miscadmin.h"
+
+#include <arpa/inet.h>
+#include <asm/types.h>
+#include <ctype.h>
+#include <dirent.h>
+#include <linux/inet_diag.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+#include <linux/sock_diag.h>
+#include <linux/tcp.h>
+#include <netinet/in.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stddef.h>
+
+PG_MODULE_MAGIC_EXT(
+					.name = "pg_stat_tcpinfo",
+					.version = PG_VERSION
+);
+
+#ifdef __linux__
+
+/* Linux kernel TCP states, see linux/include/net/tcp_states.h */
+enum
+{
+	TCP_ESTABLISHED = 1,
+	TCP_SYN_SENT,
+	TCP_SYN_RECV,
+	TCP_FIN_WAIT1,
+	TCP_FIN_WAIT2,
+	TCP_TIME_WAIT,
+	TCP_CLOSE,
+	TCP_CLOSE_WAIT,
+	TCP_LAST_ACK,
+	TCP_LISTEN,
+	TCP_CLOSING
+};
+
+/* Map of TCP states to strings */
+static const char *tcp_states_map[] = {
+	[0] = "UNKNOWN",
+	[TCP_ESTABLISHED] = "ESTABLISHED",
+	[TCP_SYN_SENT] = "SYN-SENT",
+	[TCP_SYN_RECV] = "SYN-RECV",
+	[TCP_FIN_WAIT1] = "FIN-WAIT-1",
+	[TCP_FIN_WAIT2] = "FIN-WAIT-2",
+	[TCP_TIME_WAIT] = "TIME-WAIT",
+	[TCP_CLOSE] = "CLOSE",
+	[TCP_CLOSE_WAIT] = "CLOSE-WAIT",
+	[TCP_LAST_ACK] = "LAST-ACK",
+	[TCP_LISTEN] = "LISTEN",
+	[TCP_CLOSING] = "CLOSING"
+};
+
+/* See enum in netinet/tcp.h, TCP_CLOSING seems to be the last one */
+#define TCP_MAX_STATE TCP_CLOSING + 1
+
+/* see sock_diag(7) nearby idiag_timer */
+static const char *tcptimer_names_map[] = {
+	"off",
+	"on",
+	"keepalive",
+	"timewait",
+	"persist",					/* zero probe window */
+	"unknown"
+};
+
+/*
+ * Used by struct inet_diag_req_v2 -> idiag_states (there are 11 states
+ * so we need 12 bitmask). It could also be named as TCP_ALL_FLAGS
+ */
+#define TCPF_ALL 0xFFF
+
+/* Netlink socket recieve buffer size */
+#define NL_SOCKET_BUFFER_SIZE 8192
+
+/* Stores stuff from /proc/net/tcp */
+typedef struct TcpConnection
+{
+	char		local_addr_str[64]; /* IP:port */
+	char		remote_addr_str[64];	/* IP:port */
+	char		local_ip_str[INET6_ADDRSTRLEN]; /* IP */
+	char		remote_ip_str[INET6_ADDRSTRLEN];	/* IP */
+	int			local_port;
+	int			remote_port;
+	int			state;
+	int			uid;
+	__u8		family;
+	unsigned long long inode;
+	struct TcpConnection *next;
+}			TcpConnection;
+
+typedef struct InodePid
+{
+	unsigned long long inode;
+	int			pid;
+}			InodePid;
+
+typedef struct NlDiagInfo
+{
+	char		key[512];		/* Key:"L_IP:L_PORT-R_IP:R_PORT" */
+	struct tcp_info tcpi;
+	__u32		skmem[SK_MEMINFO_VARS];
+
+	/*
+	 * 12 below because they are not long names, see available
+	 * /lib/modules/$(uname -r)/kernel/net/ipv4/tcp_<cong>.ko kernel modules
+	 */
+	char		cong[12];
+	int			has_tcpi;
+	int			has_skmem;
+	int			has_cong;
+	char		tcp_timer_str[64];
+}			NlDiagInfo;
+
+
+/* Inserts an inode+PID pair into the hash map. */
+static void
+insert_pid(HTAB *pid_map, unsigned long long inode, int pid)
+{
+	InodePid   *entry;
+	bool		found;
+
+	entry = (InodePid *) hash_search(pid_map, &inode, HASH_ENTER, &found);
+	entry->pid = pid;
+}
+
+/* Locate a PID in the hash map given an inode. Returns PID or -1 */
+static int
+find_pid(HTAB *pid_map, unsigned long long inode)
+{
+	InodePid   *entry;
+
+	entry = (InodePid *) hash_search(pid_map, &inode, HASH_FIND, NULL);
+	if (entry)
+		return entry->pid;
+
+	return -1;
+}
+
+/* Formats a unique key string for a connection. */
+static void
+format_connection_key(char *hash_key, size_t hash_key_sz,
+					  const char *local_addr, int local_port,
+					  const char *remote_addr, int remote_port)
+{
+	snprintf(hash_key, hash_key_sz, "%s:%d-%s:%d",
+			 local_addr, local_port, remote_addr, remote_port);
+}
+
+/*
+ * Finds and creates/updates a node in the nldiag_map and stores data in it.
+ */
+static void
+store_netlink_info(HTAB *nldiag_map,
+				   const char *local_addr, const int local_port,
+				   const char *remote_addr, const int remote_port,
+				   const int type, void *data, const char *tcp_timer_str)
+{
+	char		hash_key[512];
+	NlDiagInfo *entry;
+	bool		found;
+
+	format_connection_key(hash_key, sizeof(hash_key), local_addr, local_port, remote_addr, remote_port);
+	entry = (NlDiagInfo *) hash_search(nldiag_map, hash_key, HASH_ENTER, &found);
+	elog(DEBUG5, "saving some netlink chatter about %s into %p", hash_key, entry);
+
+	if (!found)
+	{
+		/*
+		 * New entry. hash_search() copied the key, but the rest of the struct
+		 * is uninitialized. Zero the payload just in case.
+		 */
+		memset((char *) entry + offsetof(NlDiagInfo, tcpi), 0,
+			   sizeof(NlDiagInfo) - offsetof(NlDiagInfo, tcpi));
+	}
+
+	/* Update the entry (whether new or old). */
+	memcpy(entry->tcp_timer_str, tcp_timer_str, sizeof(entry->tcp_timer_str));
+
+	switch (type)
+	{
+		case INET_DIAG_INFO:
+			memcpy(&entry->tcpi, data, sizeof(struct tcp_info));
+			entry->has_tcpi = 1;
+			break;
+		case INET_DIAG_SKMEMINFO:
+			memcpy(entry->skmem, data, sizeof(__u32) * SK_MEMINFO_VARS);
+			entry->has_skmem = 1;
+			break;
+		case INET_DIAG_CONG:
+			memcpy(entry->cong, data, sizeof(entry->cong));
+			entry->has_cong = 1;
+			break;
+		default:
+			elog(WARNING, "unsupported inet diag type reply");
+	}
+}
+
+/*
+ * Finds netlink info in the hash map.
+ * Returns pointer to NlDiagInfo, or NULL if not found.
+ */
+static NlDiagInfo *
+find_netlink_info(HTAB *nldiag_map,
+				  const char *local_addr, const int local_port,
+				  const char *remote_addr, const int remote_port)
+{
+	char		hash_key[512];
+	NlDiagInfo *entry;
+
+	format_connection_key(hash_key, sizeof(hash_key), local_addr, local_port, remote_addr, remote_port);
+	entry = (NlDiagInfo *) hash_search(nldiag_map, hash_key, HASH_FIND, NULL);
+	elog(DEBUG5, "nldiag_map returning about %s --> %p", hash_key, entry);
+	return entry;
+}
+
+/*
+ * Please see man sock_diag(7) on Linux for details about this API.
+ */
+static int
+send_diag_msg(int sockfd, __u8 family)
+{
+	struct msghdr msg;
+	struct nlmsghdr nlh;
+	struct inet_diag_req_v2 conn_req;
+	struct sockaddr_nl sa;
+	struct iovec iov[4];
+	int			retval = 0;
+
+	elog(DEBUG1, "quering netlink socket for TCP low-level stats");
+
+	memset(&msg, 0, sizeof(msg));
+	memset(&sa, 0, sizeof(sa));
+	memset(&nlh, 0, sizeof(nlh));
+	memset(&conn_req, 0, sizeof(conn_req));
+
+	sa.nl_family = AF_NETLINK;
+	conn_req.sdiag_family = family;
+	conn_req.sdiag_protocol = IPPROTO_TCP;
+
+	/*
+	 * Do not filter out any TCP states (include all).
+	 *
+	 * Maybe we should filter-out everything else here than TCP_ESTABLISHED?
+	 * But somehow stuck connections (e.g. in TCP_SYN_SENT) seems to be useful
+	 * info on it's own. Anyway, filtering out could work that way:
+	 *
+	 * conn_req.idiag_states = TCPF_ALL & ~((1 << TCP_SYN_RECV) | (1 <<
+	 * TCP_TIME_WAIT) | (1 << TCP_CLOSE));
+	 */
+	conn_req.idiag_states = TCPF_ALL;
+
+	/* Request extended TCP information: see linux/inet_diag include */
+	conn_req.idiag_ext |= (1 << (INET_DIAG_INFO - 1));
+	conn_req.idiag_ext |= (1 << (INET_DIAG_SKMEMINFO - 1));
+	conn_req.idiag_ext |= (1 << (INET_DIAG_CONG - 1));
+	/* XXX: we could query for INET_DIAG_VEGASINFO too */
+#ifdef INET_DIAG_BBRINFO
+	conn_req.idiag_ext |= (1 << (INET_DIAG_BBRINFO - 1));
+#endif
+
+	nlh.nlmsg_len = NLMSG_LENGTH(sizeof(conn_req));
+	nlh.nlmsg_flags = NLM_F_DUMP | NLM_F_REQUEST;
+	nlh.nlmsg_type = SOCK_DIAG_BY_FAMILY;
+	iov[0].iov_base = (void *) &nlh;
+	iov[0].iov_len = sizeof(nlh);
+	iov[1].iov_base = (void *) &conn_req;
+	iov[1].iov_len = sizeof(conn_req);
+
+	msg.msg_name = (void *) &sa;
+	msg.msg_namelen = sizeof(sa);
+	msg.msg_iov = iov;
+	msg.msg_iovlen = 2;
+
+	retval = sendmsg(sockfd, &msg, 0);
+
+	return retval;
+}
+
+
+static char *
+ms_to_min_sec(unsigned long ms)
+{
+	static char buffer[64];
+	long		total_seconds = ms / 1000;
+	long		minutes = total_seconds / 60;
+	long		seconds = total_seconds % 60;
+
+	memset(buffer, 0, sizeof(buffer));
+	snprintf(buffer, sizeof(buffer), "%ldmin%ldsec", minutes, seconds);
+	return buffer;
+}
+
+static void
+parse_diag_msg(HTAB *nldiag_map, struct inet_diag_msg *diag_msg, int rtalen)
+{
+	struct rtattr *attr;
+	char		local_addr_buf[INET6_ADDRSTRLEN];
+	char		remote_addr_buf[INET6_ADDRSTRLEN];
+	char		tcp_timer_str[64];
+	int			local_port,
+				remote_port;
+
+	memset(local_addr_buf, 0, sizeof(local_addr_buf));
+	memset(remote_addr_buf, 0, sizeof(remote_addr_buf));
+
+	local_port = ntohs(diag_msg->id.idiag_sport);
+	remote_port = ntohs(diag_msg->id.idiag_dport);
+
+	if (diag_msg->idiag_family == AF_INET)
+	{
+		inet_ntop(AF_INET, (struct in_addr *) &(diag_msg->id.idiag_src),
+				  local_addr_buf, INET_ADDRSTRLEN);
+		inet_ntop(AF_INET, (struct in_addr *) &(diag_msg->id.idiag_dst),
+				  remote_addr_buf, INET_ADDRSTRLEN);
+	}
+	else if (diag_msg->idiag_family == AF_INET6)
+	{
+		inet_ntop(AF_INET6, (struct in_addr6 *) &(diag_msg->id.idiag_src),
+				  local_addr_buf, INET6_ADDRSTRLEN);
+		inet_ntop(AF_INET6, (struct in_addr6 *) &(diag_msg->id.idiag_dst),
+				  remote_addr_buf, INET6_ADDRSTRLEN);
+	}
+	else
+	{
+		/* Unknown family, just log it */
+		ereport(WARNING, (errmsg("unknown address family in netlink response: %d", diag_msg->idiag_family)));
+		return;
+	}
+
+	if (local_addr_buf[0] == 0 || remote_addr_buf[0] == 0)
+	{
+		ereport(WARNING, (errmsg("could not get connection information from netlink message")));
+		return;
+	}
+
+	/* Format the TCP timer information that is going to be saved later on. */
+	snprintf(tcp_timer_str, sizeof(tcp_timer_str), "%s,%s,%d",
+			 tcptimer_names_map[diag_msg->idiag_timer],
+			 ms_to_min_sec(diag_msg->idiag_expires),
+			 diag_msg->idiag_retrans);
+
+	/*
+	 * XXX: perhaps also save diag_msg->idiag_[rw]queue, but we already have
+	 * it from skmem
+	 */
+
+	/* Parse the attributes, loop as we'll have multiple of them */
+	if (rtalen > 0)
+	{
+		attr = (struct rtattr *) (diag_msg + 1);
+
+		while (RTA_OK(attr, rtalen))
+		{
+			int			type = attr->rta_type;
+
+			switch (type)
+			{
+				case INET_DIAG_INFO:
+					{
+						struct tcp_info *tcpi = (struct tcp_info *) RTA_DATA(attr);
+
+						store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port,
+										   type, tcpi, tcp_timer_str);
+						break;
+					}
+				case INET_DIAG_SKMEMINFO:
+					{
+						__u32	   *skmem = RTA_DATA(attr);
+
+						store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port,
+										   type, skmem, tcp_timer_str);
+						break;
+					}
+				case INET_DIAG_CONG:
+					{
+						char	   *cong = (char *) RTA_DATA(attr);
+
+						store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port,
+										   type, cong, tcp_timer_str);
+						break;
+					}
+
+			}
+
+			attr = RTA_NEXT(attr, rtalen);
+		}
+	}
+}
+
+
+/*
+ * Scans all /proc/[PID]/fd/ entries to map socket inodes to PIDs.
+ * Fills the provided pid_map hash table.
+ */
+static void
+scan_proc_fds(HTAB *pid_map)
+{
+	DIR		   *proc_dir,
+			   *fd_dir;
+	struct dirent *pid_entry,
+			   *fd_entry;
+	char		fd_path[MAXPGPATH];
+	char		link_path[MAXPGPATH];
+	char		link_target[MAXPGPATH];
+	ssize_t		link_len;
+	unsigned long long inode;
+
+	elog(DEBUG1, "scanning /proc for PIDs");
+
+	proc_dir = opendir("/proc");
+	if (!proc_dir)
+	{
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open /proc: %m")));
+		return;
+	}
+
+	/* Iterate over each entry in /proc */
+	while ((pid_entry = readdir(proc_dir)) != NULL)
+	{
+		/* Check if the directory name is a number (a PID) */
+		if (pid_entry->d_type == DT_DIR && isdigit(pid_entry->d_name[0]))
+		{
+			int			pid = atoi(pid_entry->d_name);
+
+			snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd", pid);
+
+			/* Open the /proc/[PID]/fd directory */
+			fd_dir = opendir(fd_path);
+			if (!fd_dir)
+			{
+				ereport(DEBUG4,
+						(errcode_for_file_access(),
+						 errmsg("could not open directory \"%s\": %m", fd_path)));
+				continue;
+			}
+
+			/* Iterate over each file descriptor in /proc/[PID]/fd */
+			while ((fd_entry = readdir(fd_dir)) != NULL)
+			{
+				snprintf(link_path, sizeof(link_path), "%s/%s", fd_path, fd_entry->d_name);
+
+				/* Read the symbolic link target */
+				link_len = readlink(link_path, link_target, sizeof(link_target) - 1);
+				if (link_len == -1)
+				{
+					/* Failed to read link */
+					ereport(DEBUG4,
+							(errcode_for_file_access(),
+							 errmsg("could not read link \"%s\": %m", link_path)));
+					continue;
+				}
+				link_target[link_len] = '\0';
+
+				/* Check if it's a socket */
+				if (strncmp(link_target, "socket:[", strlen("socket:[")) == 0)
+				{
+					if (sscanf(link_target, "socket:[%llu]", &inode) == 1)
+					{
+						/* Add this inode->PID mapping to our hash table */
+						insert_pid(pid_map, inode, pid);
+					}
+				}
+			}
+			closedir(fd_dir);
+		}
+	}
+	closedir(proc_dir);
+}
+
+
+/*
+ * Reads /proc/net/tcp* and builds a linked list of connections.
+ * Returns pointer to the head of the TcpConnection linked list.
+ */
+static TcpConnection * read_tcp_connections(__u8 family)
+{
+	FILE	   *fp;
+	char		line[1024],
+			   *tcp_file_name;
+	TcpConnection *head = NULL;
+	int			local_port,
+				remote_port,
+				state,
+				uid,
+				slot;
+	unsigned long local_ip_hex,
+				remote_ip_hex;
+	struct in6_addr local_ip6_hex,
+				remote_ip6_hex;
+	unsigned long long inode;
+	TcpConnection *conn;
+
+	tcp_file_name = family == AF_INET ? "/proc/net/tcp" : "/proc/net/tcp6";
+	elog(DEBUG1, "scanning %s for TCP connections and inodes", tcp_file_name);
+
+	fp = fopen(tcp_file_name, "r");
+	if (fp == NULL)
+	{
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open %s: %m", tcp_file_name)));
+		return NULL;
+	}
+
+	/* Skip the header line */
+	if (fgets(line, sizeof(line), fp) == NULL)
+	{
+		fclose(fp);
+		ereport(ERROR,
+				(errcode(ERRCODE_IO_ERROR),
+				 errmsg("could not read header from %s", tcp_file_name)));
+		return NULL;
+	}
+
+	/* Read each line */
+	while (fgets(line, sizeof(line), fp) != NULL)
+	{
+		int			num_matched,
+					proper_matches;
+
+		if (family == AF_INET)
+		{
+			num_matched = sscanf(line, "%4d: %08lX:%04X %08lX:%04X %02X %*s %*s %*s %d %*s %llu",
+								 &slot,
+								 &local_ip_hex,
+								 &local_port,
+								 &remote_ip_hex,
+								 &remote_port,
+								 &state,
+								 &uid,
+								 &inode);
+			proper_matches = 8;
+		}
+		else
+		{
+			/*
+			 * Madness? This ... is ... sparta!
+			 *
+			 * /proc files for IPv6 tend to use 32-char hex representation of
+			 * IPv6 address
+			 */
+			num_matched = sscanf(line, "%4d: %08X%08X%08X%08X:%04X %08X%08X%08X%08X:%04X %02X %*s %*s %*s %d %*s %llu",
+								 &slot,
+								 &local_ip6_hex.s6_addr32[0],
+								 &local_ip6_hex.s6_addr32[1],
+								 &local_ip6_hex.s6_addr32[2],
+								 &local_ip6_hex.s6_addr32[3],
+								 &local_port,
+								 &remote_ip6_hex.s6_addr32[0],
+								 &remote_ip6_hex.s6_addr32[1],
+								 &remote_ip6_hex.s6_addr32[2],
+								 &remote_ip6_hex.s6_addr32[3],
+								 &remote_port,
+								 &state,
+								 &uid,
+								 &inode);
+			proper_matches = 14;
+		}
+
+		if (num_matched < proper_matches)
+		{
+			/* Failed to parse, so chomp last new line character and show it */
+			line[strlen(line) - 1] = 0;
+			ereport(WARNING, (errmsg("failed to parse line from %s (got just %d matches): %s", tcp_file_name, num_matched, line)));
+			continue;
+		}
+
+		/* Create a new connection node */
+		conn = palloc(sizeof(TcpConnection));
+
+		conn->local_port = local_port;
+		conn->remote_port = remote_port;
+
+		/* It's already in network byte order (big-endian) */
+		if (family == AF_INET)
+		{
+			if (inet_ntop(AF_INET, &local_ip_hex, conn->local_ip_str, INET_ADDRSTRLEN) == NULL)
+			{
+				ereport(WARNING, (errmsg("inet_ntop() failed for local IP: %m")));
+				strncpy(conn->local_ip_str, "INVALID_IP", INET_ADDRSTRLEN);
+			}
+
+			/* Format remote IP */
+			if (inet_ntop(AF_INET, &remote_ip_hex, conn->remote_ip_str, INET_ADDRSTRLEN) == NULL)
+			{
+				ereport(WARNING, (errmsg("inet_ntop() failed for remote IP: %m")));
+				strncpy(conn->remote_ip_str, "INVALID_IP", INET_ADDRSTRLEN);
+			}
+		}
+		else
+		{
+			/* AF_INET6 */
+			if (inet_ntop(AF_INET6, &local_ip6_hex, conn->local_ip_str, INET6_ADDRSTRLEN) == NULL)
+			{
+				ereport(WARNING, (errmsg("inet_ntop() failed for local IP: %m")));
+				strncpy(conn->local_ip_str, "INVALID_IP", INET_ADDRSTRLEN);
+			}
+
+			/* Format remote IP */
+			if (inet_ntop(AF_INET6, &remote_ip6_hex, conn->remote_ip_str, INET6_ADDRSTRLEN) == NULL)
+			{
+				ereport(WARNING, (errmsg("inet_ntop() failed for remote IP: %m")));
+				strncpy(conn->remote_ip_str, "INVALID_IP", INET_ADDRSTRLEN);
+			}
+
+		}
+
+		/* Format combined strings */
+		snprintf(conn->local_addr_str, sizeof(conn->local_addr_str), "%s:%d", conn->local_ip_str, conn->local_port);
+		snprintf(conn->remote_addr_str, sizeof(conn->remote_addr_str), "%s:%d", conn->remote_ip_str, conn->remote_port);
+
+		conn->state = state;
+		conn->uid = uid;
+		conn->inode = inode;
+		conn->family = family;
+
+		/* Add to the front of the linked list */
+		conn->next = head;
+		head = conn;
+	}
+
+	fclose(fp);
+	return head;
+}
+
+
+/* Receive and parse all netlink data, populating nldiag_map */
+static int
+recv_diag_msgs(int nl_sock, HTAB *nldiag_map)
+{
+	uint8_t		recv_buf[NL_SOCKET_BUFFER_SIZE];
+	int			numbytes = 0,
+				done = 0,
+				rtalen = 0;
+	struct inet_diag_msg *diag_msg;
+	struct nlmsghdr *nlh;
+
+	while (1)
+	{
+		numbytes = recv(nl_sock, recv_buf, sizeof(recv_buf), 0);
+		if (numbytes <= 0)
+		{
+			if (numbytes == 0)
+				ereport(WARNING, (errmsg("netlink socket closed prematurely")));
+			else
+				ereport(WARNING, (errmsg("netlink recv error: %m")));
+			break;
+			/* Exit loop on error or close */
+		}
+
+		nlh = (struct nlmsghdr *) recv_buf;
+		done = 0;
+
+		while (NLMSG_OK(nlh, numbytes))
+		{
+			if (nlh->nlmsg_type == NLMSG_DONE)
+			{
+				done = 1;
+				break;
+			}
+
+			if (nlh->nlmsg_type == NLMSG_ERROR)
+			{
+				struct nlmsgerr *err = (struct nlmsgerr *) NLMSG_DATA(nlh);
+
+				close(nl_sock);
+				ereport(ERROR, (errmsg("error in netlink message: %s", strerror(-err->error))));
+			}
+
+			diag_msg = (struct inet_diag_msg *) NLMSG_DATA(nlh);
+			rtalen = nlh->nlmsg_len - NLMSG_LENGTH(sizeof(*diag_msg));
+
+			/* This populates nldiag_map */
+			parse_diag_msg(nldiag_map, diag_msg, rtalen);
+
+			nlh = NLMSG_NEXT(nlh, numbytes);
+		}
+
+		if (done)
+			break;
+	}
+	return 0;
+}
+
+
+PG_FUNCTION_INFO_V1(pg_stat_get_tcpinfo);
+Datum
+pg_stat_get_tcpinfo(PG_FUNCTION_ARGS)
+{
+	Datum		values[10];
+	bool		nulls[10];
+	HTAB	   *pid_map;
+	MemoryContext oldcontext;
+	MemoryContext per_query_ctx;
+	NlDiagInfo *diag_info;
+	HTAB	   *nldiag_map;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	StringInfoData json_buf;
+	TcpConnection *tcp_connections;
+	TcpConnection *current;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	__u32	   *skmem,
+				recvq = 0,
+				sendq = 0;
+	bool		has_data;
+	bool		has_q_data;
+	const char *state_str;
+	int			nl_sock = 0,
+				pid;
+	HASHCTL		ctl;
+
+	/* Check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not " \
+						"allowed in this context")));
+
+	/* Switch into long-lived context to construct returned data structures */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Allocate and initialize the hash maps in the CurrentMemoryContext. */
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(unsigned long long);
+	ctl.entrysize = sizeof(InodePid);
+	pid_map = hash_create("Inode to PID Map", 1024, &ctl, HASH_ELEM);
+
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = 512;
+	ctl.entrysize = sizeof(NlDiagInfo);
+	nldiag_map = hash_create("Netlink Diag Map", 1024, &ctl, HASH_ELEM | HASH_STRINGS);
+
+	/* Load the inode->PID hash map while scanning /proc/PIDs */
+	scan_proc_fds(pid_map);
+
+	/* Read all TCP connections from /proc/net/tcp (IPv4 file) */
+	tcp_connections = read_tcp_connections(AF_INET);
+	if (!tcp_connections)
+		PG_RETURN_VOID();
+
+	/* Find the tail of linked list of TCP connections ... */
+	current = tcp_connections;
+	while (current->next != NULL)
+	{
+		current = current->next;
+	}
+	/* ... and append list of IPv6-based TCP connections */
+	current->next = read_tcp_connections(AF_INET6);
+
+	/* Open netlink and query about sockets */
+	if ((nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_INET_DIAG)) == -1)
+	{
+		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
+						errmsg("could not create netlink socket: %m")));
+	}
+
+	/* IPv4 netlink message */
+	if (send_diag_msg(nl_sock, AF_INET) < 0)
+	{
+		close(nl_sock);
+		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
+						errmsg("could not send AF_INET netlink message: %m")));
+	}
+	if (recv_diag_msgs(nl_sock, nldiag_map))
+	{
+		close(nl_sock);
+		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
+						errmsg("could not process netlink message: %m")));
+	}
+
+	/* IPv6 netlink message */
+	if (send_diag_msg(nl_sock, AF_INET6) < 0)
+	{
+		close(nl_sock);
+		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
+						errmsg("could not send AF_INET6 netlink message: %m")));
+	}
+
+	if (recv_diag_msgs(nl_sock, nldiag_map))
+	{
+		close(nl_sock);
+		ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
+						errmsg("could not process netlink message: %m")));
+	}
+
+	close(nl_sock);
+
+	/* Start populating the tuplestore */
+	initStringInfo(&json_buf);
+
+	/* For each TCP connection from linked list */
+	current = tcp_connections;
+	while (current != NULL)
+	{
+		int			i = 0;
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		/* pid */
+		pid = find_pid(pid_map, current->inode);
+		if (pid == -1)
+			nulls[i++] = true;
+		else
+			values[i++] = Int32GetDatum(pid);
+
+		/* uid */
+		values[i++] = Int32GetDatum(current->uid);
+
+		/* local ip */
+		if (strcmp(current->local_ip_str, "INVALID_IP") == 0)
+			nulls[i++] = true;
+		else
+			values[i++] = DirectFunctionCall1(inet_in, CStringGetDatum(current->local_ip_str));
+
+		/* local port */
+		values[i++] = Int32GetDatum(current->local_port);
+
+		/* remote ip */
+		if (strcmp(current->remote_ip_str, "INVALID_IP") == 0)
+			nulls[i++] = true;
+		else
+			values[i++] = DirectFunctionCall1(inet_in, CStringGetDatum(current->remote_ip_str));
+
+		/* remote port */
+		values[i++] = Int32GetDatum(current->remote_port);
+
+		/* state */
+		state_str = "UNKNOWN";
+		if (current->state > 0 && current->state < TCP_MAX_STATE)
+			state_str = tcp_states_map[current->state];
+		values[i++] = CStringGetTextDatum(state_str);
+
+		/* build big JSON with detailed information */
+		resetStringInfo(&json_buf);
+		appendStringInfoChar(&json_buf, '{');
+		has_data = false;
+		has_q_data = false;
+
+		diag_info = find_netlink_info(nldiag_map,
+									  current->local_ip_str, current->local_port,
+									  current->remote_ip_str, current->remote_port
+			);
+
+		if (diag_info)
+		{
+			/* add TCP timer information */
+			appendStringInfo(&json_buf, "\"timer\": \"(%s)\", ", diag_info->tcp_timer_str);
+
+			/* add low-level TCP stats for troubleshooting */
+			if (diag_info->has_tcpi)
+			{
+				struct tcp_info *tcpi = &diag_info->tcpi;
+
+
+#define appendTcpinfoMember(var, type) \
+				appendStringInfo(&json_buf, "\"%s\": " type ", ", #var, tcpi->tcpi_##var)
+
+#define appendCalculation(var, type, calc) \
+				appendStringInfo(&json_buf, "\"%s\": " type ", ", #var, calc)
+
+#define appendLastTcpinfoMember(var, type) \
+				appendStringInfo(&json_buf, "\"%s\": " type, #var, tcpi->tcpi_##var)
+
+				/*
+				 * XXX: with INET_DIAG_VEGASINFO the "rtt" could also be also
+				 * taken from struct tcpvegas_info(?)
+				 */
+				appendCalculation(rtt, "%.3f", (double) tcpi->tcpi_rtt / 1000.0);
+				appendCalculation(rttvar, "%.3f", (double) tcpi->tcpi_rttvar / 1000.0);
+				appendCalculation(rcv_rtt, "%.3f", (double) tcpi->tcpi_rcv_rtt / 1000.0);
+				appendCalculation(ato, "%.3f", (double) tcpi->tcpi_ato / 1000.0);
+				appendCalculation(min_rtt, "%.3f", (double) tcpi->tcpi_min_rtt / 1000.0);
+
+				appendTcpinfoMember(snd_cwnd, "%u");
+				appendTcpinfoMember(snd_cwnd, "%u");
+				appendTcpinfoMember(sndbuf_limited, "%llu");
+				appendTcpinfoMember(rwnd_limited, "%llu");
+				appendTcpinfoMember(delivery_rate, "%llu");
+				appendTcpinfoMember(state, "%u");
+				appendTcpinfoMember(ca_state, "%u");
+				appendTcpinfoMember(retransmits, "%u");
+				appendTcpinfoMember(probes, "%u");
+				appendTcpinfoMember(options, "%u");
+				appendTcpinfoMember(snd_wscale, "%u");
+				appendTcpinfoMember(rcv_wscale, "%u");
+				appendTcpinfoMember(delivery_rate_app_limited, "%u");
+				appendTcpinfoMember(total_rto, "%u");
+				appendTcpinfoMember(total_rto_recoveries, "%u");
+				appendTcpinfoMember(rto, "%u");
+				appendTcpinfoMember(snd_mss, "%u");
+				appendTcpinfoMember(rcv_mss, "%u");
+				appendTcpinfoMember(unacked, "%u");
+				appendTcpinfoMember(sacked, "%u");
+				appendTcpinfoMember(lost, "%u");
+				appendTcpinfoMember(retrans, "%u");
+				appendTcpinfoMember(fackets, "%u");
+				appendTcpinfoMember(last_data_sent, "%u");
+				appendTcpinfoMember(last_ack_sent, "%u");
+				appendTcpinfoMember(last_data_recv, "%u");
+				appendTcpinfoMember(last_ack_recv, "%u");
+				appendTcpinfoMember(pmtu, "%u");
+				appendTcpinfoMember(rcv_ssthresh, "%u");
+				appendTcpinfoMember(snd_ssthresh, "%u");
+				appendTcpinfoMember(snd_cwnd, "%u");
+				appendTcpinfoMember(advmss, "%u");
+				appendTcpinfoMember(reordering, "%u");
+				appendTcpinfoMember(rcv_space, "%u");
+				appendTcpinfoMember(total_retrans, "%u");
+				appendTcpinfoMember(segs_out, "%u");
+				appendTcpinfoMember(segs_in, "%u");
+				appendTcpinfoMember(notsent_bytes, "%u");
+				appendTcpinfoMember(data_segs_out, "%u");
+				appendTcpinfoMember(data_segs_in, "%u");
+				appendTcpinfoMember(delivered, "%u");
+				appendTcpinfoMember(delivered_ce, "%u");
+				appendTcpinfoMember(dsack_dups, "%u");
+				appendTcpinfoMember(reord_seen, "%u");
+				appendTcpinfoMember(rcv_ooopack, "%u");
+				appendTcpinfoMember(snd_wnd, "%u");
+				appendTcpinfoMember(rcv_wnd, "%u");
+				appendTcpinfoMember(rehash, "%u");
+				appendTcpinfoMember(total_rto_time, "%u");
+				appendTcpinfoMember(pacing_rate, "%llu");
+				appendTcpinfoMember(max_pacing_rate, "%llu");
+				appendTcpinfoMember(bytes_acked, "%llu");
+				appendTcpinfoMember(bytes_received, "%llu");
+				appendTcpinfoMember(delivery_rate, "%llu");
+				appendTcpinfoMember(busy_time, "%llu");
+				appendTcpinfoMember(rwnd_limited, "%llu");
+				appendTcpinfoMember(sndbuf_limited, "%llu");
+				appendTcpinfoMember(bytes_sent, "%llu");
+				appendLastTcpinfoMember(bytes_retrans, "%llu");
+
+				has_data = true;
+			}
+
+			/* add detailed TCP buffer sizes as seen by the kerne */
+			if (diag_info->has_skmem)
+			{
+				if (has_data)
+					appendStringInfoString(&json_buf, ", ");
+				skmem = diag_info->skmem;
+				appendStringInfo(&json_buf,
+								 "\"skmem\": {\"rmem_alloc\": %u, \"rcvbuf\": %u, \"wmem_alloc\": %u, \"sndbuf\": %u, \"fwd_alloc\": %u, \"wmem_queued\": %u, \"optmem\": %u}",
+								 skmem[SK_MEMINFO_RMEM_ALLOC],
+								 skmem[SK_MEMINFO_RCVBUF],
+								 skmem[SK_MEMINFO_WMEM_ALLOC],
+								 skmem[SK_MEMINFO_SNDBUF],
+								 skmem[SK_MEMINFO_FWD_ALLOC],
+								 skmem[SK_MEMINFO_WMEM_QUEUED],
+								 skmem[SK_MEMINFO_OPTMEM]);
+
+				recvq = skmem[SK_MEMINFO_RMEM_ALLOC];
+				sendq = skmem[SK_MEMINFO_WMEM_ALLOC];
+
+				has_data = true;
+				has_q_data = true;
+			}
+
+			/* also add TCP congestion used for the connection */
+			if (diag_info->has_cong)
+			{
+				if (has_data)
+					appendStringInfoString(&json_buf, ", ");
+
+				/* See info about TCP_CONGESTION in tcp(7). */
+				appendStringInfo(&json_buf, "\"congestion\": \"%s\"", diag_info->cong);
+
+				has_data = true;
+			}
+		}
+		appendStringInfoChar(&json_buf, '}');
+
+		/* Fill the queues: sendq and recvq */
+		if (!has_q_data)
+		{
+			nulls[i++] = true;
+			nulls[i++] = true;
+		}
+		else
+		{
+			values[i++] = Int32GetDatum(recvq);
+			values[i++] = Int32GetDatum(sendq);
+		}
+
+		/* Fill in the main tcpinfo JSON coolumn */
+		if (!has_data)
+			nulls[i++] = true;
+		else
+			values[i++] = DirectFunctionCall1(jsonb_in, CStringGetDatum(json_buf.data));
+
+		/* Store the tuple */
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+
+		current = current->next;
+	}
+
+	PG_RETURN_VOID();
+}
+
+#else
+
+/* On anything else than Linux. */
+PG_FUNCTION_INFO_V1(pg_stat_get_tcpinfo);
+Datum
+pg_stat_get_tcpinfo(PG_FUNCTION_ARGS)
+{
+	elog(ERROR, "pg_stat_tcpinfo is not supported on this platform");
+}
+#endif
diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control
new file mode 100644
index 00000000000..0fbf92c095d
--- /dev/null
+++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control
@@ -0,0 +1,5 @@
+# tcpinfo.control
+comment = 'Provides pg_stat_tcpinfo to show detailed TCP connection info on Linux.'
+default_version = '1.0'
+module_pathname = '$libdir/pg_stat_tcpinfo'
+relocatable = true
-- 
2.43.0

Reply via email to