On Fri, Apr 30, 2021 at 2:23 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> Here's something I wanted to park here to look into for the next
> cycle:  it turns out that kqueue's EV_EOF flag also has the right
> semantics for this.  That leads to the idea of exposing the event via
> the WaitEventSet API, and would the bring
> client_connection_check_interval feature to 6/10 of our OSes, up from
> 2/10.  Maybe Windows' FD_CLOSE event could get us up to 7/10, not
> sure.

Rebased.  Added documentation tweak and a check to reject the GUC on
unsupported OSes.
From 356b0dcbc15353b9bd349972c80a7f2e5c516a0e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:38:40 +1200
Subject: [PATCH v2 1/2] Add WL_SOCKET_CLOSED for socket shutdown events.

Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds, using (non-standard) POLLRDHUP
* WAIT_USE_EPOLL builds, using EPOLLRDHUP
* WAIT_USE_KQUEUE builds, using EV_EOF

Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |  6 ++-
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 1d893cf863..54e928c564 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set)
  * - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
  *	 can be combined with other WL_SOCKET_* events (on non-Windows
  *	 platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
  * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
  *
  * Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	else
 	{
 		Assert(event->fd != PGINVALID_SOCKET);
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 
 		if (event->events & WL_SOCKET_READABLE)
 			epoll_ev.events |= EPOLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			epoll_ev.events |= EPOLLOUT;
+		if (event->events & WL_SOCKET_CLOSED)
+			epoll_ev.events |= EPOLLRDHUP;
 	}
 
 	/*
@@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 		pollfd->events = 0;
 		if (event->events & WL_SOCKET_READABLE)
 			pollfd->events |= POLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+		if (event->events & WL_SOCKET_CLOSED)
+			pollfd->events |= POLLRDHUP;
+#endif
 	}
 
 	Assert(event->fd != PGINVALID_SOCKET);
@@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
 	Assert(event->events == WL_LATCH_SET ||
 		   event->events == WL_POSTMASTER_DEATH ||
-		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+		   (event->events & (WL_SOCKET_READABLE |
+							 WL_SOCKET_WRITEABLE |
+							 WL_SOCKET_CLOSED)));
 
 	if (event->events == WL_POSTMASTER_DEATH)
 	{
@@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 		 * old event mask to the new event mask, since kevent treats readable
 		 * and writable as separate events.
 		 */
-		if (old_events & WL_SOCKET_READABLE)
+		if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			old_filt_read = true;
-		if (event->events & WL_SOCKET_READABLE)
+		if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			new_filt_read = true;
 		if (old_events & WL_SOCKET_WRITEABLE)
 			old_filt_write = true;
@@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 									 event);
 	}
 
-	Assert(count > 0);
+	/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+	if (count == 0)
+		return;
+
 	Assert(count <= 2);
 
 	rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd != PGINVALID_SOCKET);
 
@@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+			{
+				/* remote peer shut down, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			occurred_events++;
 			returned_events++;
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd >= 0);
 
@@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_READABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_kqueue_event->filter == EVFILT_READ) &&
+				(cur_kqueue_event->flags & EV_EOF))
+			{
+				/* the remote peer has shut down */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
 				(cur_kqueue_event->filter == EVFILT_WRITE))
 			{
@@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			int			errflags = POLLHUP | POLLERR | POLLNVAL;
 
@@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+#ifdef POLLRDHUP
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_pollfd->revents & (POLLRDHUP | errflags)))
+			{
+				/* remote peer closed, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+#endif
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+	defined(WAIT_USE_EPOLL) || \
+	defined(WAIT_USE_KQUEUE)
+	return true;
+#else
+	return false;
+#endif
+}
+
 /*
  * Get the number of wait events registered in a given WaitEventSet.
  */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 44f9368c64..d78ff0bede 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -134,10 +134,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
-
+#define WL_SOCKET_CLOSED 	 (1 << 7)
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
-							 WL_SOCKET_CONNECTED)
+							 WL_SOCKET_CONNECTED | \
+							 WL_SOCKET_CLOSED)
 
 typedef struct WaitEvent
 {
@@ -180,5 +181,6 @@ extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool	WaitEventSetCanReportClosed(void);
 
 #endif							/* LATCH_H */
-- 
2.30.2

From c37dc98cea2983734decc37cadc2f3a19edd1e85 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:48:32 +1200
Subject: [PATCH v2 2/2] Use WL_SOCKET_CLOSED for
 client_connection_check_interval.

Previously we used poll() directly to check for a POLLRDHUP event.
Instead, use WaitEventSet to poll the socket for WL_SOCKET_CLOSED, which
knows how to detect equivalent events on many more operating systems.

XXX Manually tested by killing psql, sending an async query with libpq
and then sending 'X' and hanging up, and by yanking an ethernet cable
(but the last requires TCP keepalives to be enabled).  Need to find a
clever way to do at least the first two in a TAP (?) test without races.

Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 doc/src/sgml/config.sgml     |  5 ++---
 src/backend/libpq/pqcomm.c   | 35 +++++++++++++----------------------
 src/backend/utils/misc/guc.c |  7 ++-----
 3 files changed, 17 insertions(+), 30 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index aa3e178240..c94e0ed9ef 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1012,9 +1012,8 @@ include_dir 'conf.d'
         the kernel reports that the connection is closed.
        </para>
        <para>
-        This option is currently available only on systems that support the
-        non-standard <symbol>POLLRDHUP</symbol> extension to the
-        <symbol>poll</symbol> system call, including Linux.
+        This option relies on kernel events exposed by Linux, BSD-family,
+        macOS and illumos, and is not available on other operating systems.
        </para>
        <para>
         If the value is specified without units, it is taken as milliseconds.
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 89a5f901aa..5660ece4e3 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1932,33 +1932,24 @@ pq_settcpusertimeout(int timeout, Port *port)
 bool
 pq_check_connection(void)
 {
-#if defined(POLLRDHUP)
+	WaitEvent events[3];
+	int		rc;
+
 	/*
-	 * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by
-	 * the other end.  We don't have a portable way to do that without
-	 * actually trying to read or write data on other systems.  We don't want
-	 * to read because that would be confused by pipelined queries and COPY
-	 * data. Perhaps in future we'll try to write a heartbeat message instead.
+	 * Temporarily ignore the latch, while we check if the socket has been
+	 * closed by the other end (if that is possible on this OS).
 	 */
-	struct pollfd pollfd;
-	int			rc;
-
-	pollfd.fd = MyProcPort->sock;
-	pollfd.events = POLLOUT | POLLIN | POLLRDHUP;
-	pollfd.revents = 0;
-
-	rc = poll(&pollfd, 1, 0);
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, NULL);
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+	rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, MyLatch);
 
-	if (rc < 0)
+	for (int i = 0; i < rc; ++i)
 	{
-		ereport(COMMERROR,
-				(errcode_for_socket_access(),
-				 errmsg("could not poll socket: %m")));
-		return false;
+		if (events[i].pos == FeBeWaitSetSocketPos &&
+			events[i].events & WL_SOCKET_CLOSED)
+			return false;
 	}
-	else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP)))
-		return false;
-#endif
 
 	return true;
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 68b62d523d..465b5ec1b6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -12063,14 +12063,11 @@ check_huge_page_size(int *newval, void **extra, GucSource source)
 static bool
 check_client_connection_check_interval(int *newval, void **extra, GucSource source)
 {
-#ifndef POLLRDHUP
-	/* Linux only, for now.  See pq_check_connection(). */
-	if (*newval != 0)
+	if (!WaitEventSetCanReportClosed() && *newval != 0)
 	{
-		GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP.");
+		GUC_check_errdetail("client_connection_check_interval must be set to 0 on this platform");
 		return false;
 	}
-#endif
 	return true;
 }
 
-- 
2.30.2

Reply via email to