On Sat, Apr 3, 2021 at 9:27 AM Thomas Munro <thomas.mu...@gmail.com> wrote: > Pushed! Thanks to all who contributed.
Here's something I wanted to park here to look into for the next cycle: it turns out that kqueue's EV_EOF flag also has the right semantics for this. That leads to the idea of exposing the event via the WaitEventSet API, and would the bring client_connection_check_interval feature to 6/10 of our OSes, up from 2/10. Maybe Windows' FD_CLOSE event could get us up to 7/10, not sure.
From 957826a4c6bfec2d88c2ea2f004e55ebf12ea473 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 30 Apr 2021 10:38:40 +1200 Subject: [PATCH 1/2] Add WL_SOCKET_CLOSED for socket shutdown events. Provide a way for WaitEventSet to report that the remote peer has shut down its socket, independently of whether there is any buffered data remaining to be read. This works only on systems where the kernel exposes that information, namely: * WAIT_USE_POLL builds, on systems that have the POLLRDHUP extension * WAIT_USE_EPOLL builds, using EPOLLRDHUP * WAIT_USE_KQUEUE builds, using EV_EOF --- src/backend/storage/ipc/latch.c | 64 ++++++++++++++++++++++++++++----- src/include/storage/latch.h | 5 +-- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index ad781131e2..6c77356019 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set) * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, * can be combined with other WL_SOCKET_* events (on non-Windows * platforms, this is the same as WL_SOCKET_WRITEABLE) + * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies * * Returns the offset in WaitEventSet->events (starting from 0), which can be @@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) else { Assert(event->fd != PGINVALID_SOCKET); - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_CLOSED) + epoll_ev.events |= EPOLLRDHUP; } /* @@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } else { - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) pollfd->events |= POLLIN; if (event->events & WL_SOCKET_WRITEABLE) pollfd->events |= POLLOUT; +#ifdef POLLRDHUP + if (event->events & WL_SOCKET_CLOSED) + pollfd->events |= POLLRDHUP; +#endif } Assert(event->fd != PGINVALID_SOCKET); @@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) * old event mask to the new event mask, since kevent treats readable * and writable as separate events. */ - if (old_events & WL_SOCKET_READABLE) + if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) old_filt_read = true; - if (event->events & WL_SOCKET_READABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) new_filt_read = true; if (old_events & WL_SOCKET_WRITEABLE) old_filt_write = true; @@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event); } - Assert(count > 0); + /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */ + if (count == 0) + return; + Assert(count <= 2); rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); @@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd != PGINVALID_SOCKET); @@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))) + { + /* remote peer shut down, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events++; returned_events++; } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd >= 0); @@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_READABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_kqueue_event->filter == EVFILT_READ) && + (cur_kqueue_event->flags & EV_EOF)) + { + /* the remote peer has shut down */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_kqueue_event->filter == EVFILT_WRITE)) { @@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { int errflags = POLLHUP | POLLERR | POLLNVAL; @@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } +#ifdef POLLRDHUP + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_pollfd->revents & (POLLRDHUP | errflags))) + { + /* remote peer closed, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } +#endif + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 44f9368c64..fd3581a99c 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -134,10 +134,11 @@ typedef struct Latch /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif - +#define WL_SOCKET_CLOSED (1 << 7) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ - WL_SOCKET_CONNECTED) + WL_SOCKET_CONNECTED | \ + WL_SOCKET_CLOSED) typedef struct WaitEvent { -- 2.30.2
From c29ca0d9262e7eacf900cb83f9fd0e655f4cc261 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 30 Apr 2021 10:48:32 +1200 Subject: [PATCH 2/2] Use WL_SOCKET_CLOSED for client_connection_check_interval. Previously we used poll() directly to check for a POLLRDHUP event. Instead, use WaitEventSet to poll the socket for WL_SOCKET_CLOSED, which knows how to detect that condition on more systems. XXX Need to figure out a way to allow this feature only some builds, but that information is currently private to latch.c XXX May need to review the way eg POLLERR/POLLHUP and equivalents are treated when you're waiting/polling for WL_SOCKET_CLOSED. XXX Manually tested by killing psql, sending an async query with libpq and then sending 'X' and hanging up, and by yanking an ethernet cable (but the last requires TCP keepalives to be enabled). Need to find a clever way to do at least the first two in a TAP (?) test without races. --- src/backend/libpq/pqcomm.c | 35 +++++++++++++---------------------- src/backend/utils/misc/guc.c | 17 +---------------- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index b9ccd4473f..1334dd4bb3 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -1932,33 +1932,24 @@ pq_settcpusertimeout(int timeout, Port *port) bool pq_check_connection(void) { -#if defined(POLLRDHUP) + WaitEvent events[3]; + int rc; + /* - * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by - * the other end. We don't have a portable way to do that without - * actually trying to read or write data on other systems. We don't want - * to read because that would be confused by pipelined queries and COPY - * data. Perhaps in future we'll try to write a heartbeat message instead. + * Temporarily ignore the latch, while we check if the socket has been + * closed by the other end (if that is possible on this OS). */ - struct pollfd pollfd; - int rc; - - pollfd.fd = MyProcPort->sock; - pollfd.events = POLLOUT | POLLIN | POLLRDHUP; - pollfd.revents = 0; - - rc = poll(&pollfd, 1, 0); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, NULL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL); + rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, MyLatch); - if (rc < 0) + for (int i = 0; i < rc; ++i) { - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not poll socket: %m"))); - return false; + if (events[i].pos == FeBeWaitSetSocketPos && + events[i].events & WL_SOCKET_CLOSED) + return false; } - else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP))) - return false; -#endif return true; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b130874bdc..60512ec82f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -209,7 +209,6 @@ static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource sourc static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); static bool check_huge_page_size(int *newval, void **extra, GucSource source); -static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source); static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); @@ -3580,7 +3579,7 @@ static struct config_int ConfigureNamesInt[] = }, &client_connection_check_interval, 0, 0, INT_MAX, - check_client_connection_check_interval, NULL, NULL + NULL, NULL, NULL }, /* End-of-list marker */ @@ -12094,20 +12093,6 @@ check_huge_page_size(int *newval, void **extra, GucSource source) return true; } -static bool -check_client_connection_check_interval(int *newval, void **extra, GucSource source) -{ -#ifndef POLLRDHUP - /* Linux only, for now. See pq_check_connection(). */ - if (*newval != 0) - { - GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP."); - return false; - } -#endif - return true; -} - static void assign_maintenance_io_concurrency(int newval, void *extra) { -- 2.30.2