On Sat, Feb 8, 2020 at 10:15 AM Thomas Munro <thomas.mu...@gmail.com> wrote: > > > Here are some patches to get rid of frequent system calls.
Here's a new version of this patch set. It gets rid of all temporary WaitEventSets except a couple I mentioned in another thread[1]. WaitLatch() uses CommonWaitSet, and calls to WaitLatchOrSocket() are replaced by either the existing FeBeWaitSet (walsender, gssapi/openssl auth are also candidates) or a special purpose long lived WaitEventSet (replication, postgres_fdw, pgstats). It passes make check-world with WAIT_USE_POLL, WAIT_USE_KQUEUE, WAIT_USE_EPOLL, all with and without -DEXEC_BACKEND, and make check with WAIT_USE_WIN32 (Appveyor). 0001: "Don't use EV_CLEAR for kqueue events." This fixes a problem in the kqueue implementation that only shows up once you switch to long lived WaitEventSets. It needs to be level-triggered like the other implementations, for example because there's a place in the recovery tests where we wait twice in a row without trying to do I/O in between. (This is a bit like commit 3b790d256f8 that fixed a similar problem on Windows.) 0002: "Use a long lived WaitEventSet for WaitLatch()." In the last version, I had a new function WaitMyLatch(), but that didn't help with recoveryWakeupLatch. Switching between latches doesn't require a syscall, so I didn't want to have a separate WES and function just for that. So I went back to using plain old WaitLatch(), and made it "modify" the latch every time before waiting on CommonWaitSet. An alternative would be to get rid of the concept of latches other than MyLatch, and change the function to WaitMyLatch(). A similar thing happens for exit_on_postmaster_death, for which I didn't want to have a separate WES, so I just set that flag every time. Thoughts? 0003: "Use regular WaitLatch() for condition variables." That mechanism doesn't need its own WES anymore. 0004: "Introduce RemoveWaitEvent()." We'll need that to be able to handle connections that are closed and reopened under the covers by libpq (replication, postgres_fdw). We also wanted this on a couple of other threads for multiplexing FDWs, to be able to adjust the wait set dynamically for a proposed async Append feature. The implementation is a little naive, and leaves holes in the "pollfds" and "handles" arrays (for poll() and win32 implementations). That could be improved with a bit more footwork in a later patch. XXX The Windows version is based on reading documentation. I'd be very interested to know if check-world passes (especially contrib/postgres_fdw and src/test/recovery). Unfortunately my appveyor.yml fu is not yet strong enough. 0005: "libpq: Add PQsocketChangeCount to advertise socket changes." To support a long lived WES, libpq needs a way tell us when the socket changes underneath our feet. This is the simplest thing I could think of; better ideas welcome. 0006: "Reuse a WaitEventSet in libpqwalreceiver.c." Rather than having all users of libpqwalreceiver.c deal with the complicated details of wait set management, have libpqwalreceiver expose a waiting interface that understands socket changes. Unfortunately, I couldn't figure out how to use CommonWaitSet for this (ie adding and removing sockets to that as required), due to complications with the bookkeeping required to provide the fd_closed flag to RemoveWaitEvent(). So it creates its own internal long lived WaitEventSet. 0007: "Use a WaitEventSet for postgres_fdw." Create a single WaitEventSet and use it for all FDW connections. By having our own dedicated WES, we can do the bookkeeping required to know when sockets have been closed or need to removed from kernel wait sets explicitly (which would be much harder to achieve if CommonWaitSet were to be used like that; you need to know when sockets are closed by other code, so you can provide fd_closed to RemoveWaitEvent()). Concretely, if you use just one postgres_fdw connection, you'll see just epoll_wait()/kevent() calls for waits, but whever you switch between different connections, you'll see eg EPOLL_DEL/EV_DELETE followed by EPOLL_ADD/EV_ADD when the set is adjusted (in the kqueue implementation these could be collapse into the following wait, but I haven't done the work for that). An alternative would be to have one WES per FDW connection, but that seemed wasteful of file descriptors. 0008: "Use WL_EXIT_ON_PM_DEATH in FeBeWaitSet." The FATAL message you get if you happen to be waiting for IO rather than waiting somewhere else seems arbitrarily different. By switching to a standard automatic exit, it opens the possibility of using FeBeWaitSet in a couple more places that would otherwise need to create their own WES (see also [1]). Thoughts? 0009: "Use FeBeWaitSet for walsender.c." Enabled by 0008. 0010: "Introduce a WaitEventSet for the stats collector." [1] https://www.postgresql.org/message-id/flat/CA%2BhUKGK%3Dm9dLrq42oWQ4XfK9iDjGiZVwpQ1HkHrAPfG7Kh681g%40mail.gmail.com
From 6787ffeb57043b2b19201a49e656d56ab58b2d7e Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 25 Feb 2020 02:53:35 +1300 Subject: [PATCH 01/11] Don't use EV_CLEAR for kqueue events. For the semantics to match the epoll implementation, we need a socket to continue to appear readable/writable if you wait multiple times without doing I/O in between (in Linux terminology, level-triggered rather than edge-triggered). Similar to commit 3b790d256f8 for Windows. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/storage/ipc/latch.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 046ca5c6c7..3b6acfb251 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -991,7 +991,7 @@ WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action, { k_ev->ident = event->fd; k_ev->filter = filter; - k_ev->flags = action | EV_CLEAR; + k_ev->flags = action; k_ev->fflags = 0; k_ev->data = 0; AccessWaitEvent(k_ev) = event; @@ -1003,7 +1003,7 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event) /* For now postmaster death can only be added, not removed. */ k_ev->ident = PostmasterPid; k_ev->filter = EVFILT_PROC; - k_ev->flags = EV_ADD | EV_CLEAR; + k_ev->flags = EV_ADD; k_ev->fflags = NOTE_EXIT; k_ev->data = 0; AccessWaitEvent(k_ev) = event; -- 2.20.1
From 4e528ac083260e10b9fbeb35b4fd14a5c1267e32 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 24 Feb 2020 15:39:24 +1300 Subject: [PATCH 02/11] Use a long lived WaitEventSet for WaitLatch(). Create CommonWaitSet at backend startup time, and use it to implement WaitLatch(). This avoids a bunch of epoll/kqueue system calls, and makes sure we don't run into EMFILE later due to lack of file descriptors. Reorder SubPostmasterMain() slightly so that we restore the postmaster pipe and Windows signal before we reach InitPostmasterChild(), to make this work in EXEC_BACKEND builds. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/postmaster/postmaster.c | 24 +++++++------- src/backend/storage/ipc/latch.c | 49 +++++++++++++++++++++++++++-- src/backend/utils/init/miscinit.c | 2 ++ src/include/storage/latch.h | 1 + 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index cd61665eea..b870346c75 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -4882,9 +4882,6 @@ SubPostmasterMain(int argc, char *argv[]) IsPostmasterEnvironment = true; whereToSendOutput = DestNone; - /* Setup as postmaster child */ - InitPostmasterChild(); - /* Setup essential subsystems (to ensure elog() behaves sanely) */ InitializeGUCOptions(); @@ -4899,6 +4896,18 @@ SubPostmasterMain(int argc, char *argv[]) /* Close the postmaster's sockets (as soon as we know them) */ ClosePostmasterPorts(strcmp(argv[1], "--forklog") == 0); + /* + * Start our win32 signal implementation. This has to be done after we + * read the backend variables, because we need to pick up the signal pipe + * from the parent process. + */ +#ifdef WIN32 + pgwin32_signal_initialize(); +#endif + + /* Setup as postmaster child */ + InitPostmasterChild(); + /* * Set reference point for stack-depth checking */ @@ -4947,15 +4956,6 @@ SubPostmasterMain(int argc, char *argv[]) if (strcmp(argv[1], "--forkavworker") == 0) AutovacuumWorkerIAm(); - /* - * Start our win32 signal implementation. This has to be done after we - * read the backend variables, because we need to pick up the signal pipe - * from the parent process. - */ -#ifdef WIN32 - pgwin32_signal_initialize(); -#endif - /* In EXEC_BACKEND case we will not have inherited these settings */ pqinitmask(); PG_SETMASK(&BlockSig); diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 3b6acfb251..30e461e965 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -56,6 +56,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/memutils.h" /* * Select the fd readiness primitive to use. Normally the "most modern" @@ -129,6 +130,9 @@ struct WaitEventSet #endif }; +/* A common WaitEventSet used to implement WatchLatch() */ +static WaitEventSet *CommonWaitSet; + #ifndef WIN32 /* Are we currently in WaitLatch? The signal handler would like to know. */ static volatile sig_atomic_t waiting = false; @@ -242,6 +246,20 @@ InitializeLatchSupport(void) #endif } +void +InitializeCommonWaitSet(void) +{ + Assert(CommonWaitSet == NULL); + + /* Set up the WaitEventSet used by WaitLatch(). */ + CommonWaitSet = CreateWaitEventSet(TopMemoryContext, 2); + AddWaitEventToSet(CommonWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + if (IsUnderPostmaster) + AddWaitEventToSet(CommonWaitSet, WL_EXIT_ON_PM_DEATH, + PGINVALID_SOCKET, NULL, NULL); +} + /* * Initialize a process-local latch. */ @@ -365,8 +383,29 @@ int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info) { - return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout, - wait_event_info); + WaitEvent event; + + /* Postmaster-managed callers must handle postmaster death somehow. */ + Assert(!IsUnderPostmaster || + (wakeEvents & WL_EXIT_ON_PM_DEATH) || + (wakeEvents & WL_POSTMASTER_DEATH)); + + /* + * Some callers may have a latch other than MyLatch, or want to handle + * postmaster death differently. It's cheap to assign those, so just do it + * every time. + */ + ModifyWaitEvent(CommonWaitSet, 0, WL_LATCH_SET, latch); + CommonWaitSet->exit_on_postmaster_death = + ((wakeEvents & WL_EXIT_ON_PM_DEATH) != 0); + + if (WaitEventSetWait(CommonWaitSet, + (wakeEvents & WL_TIMEOUT) ? timeout : -1, + &event, 1, + wait_event_info) == 0) + return WL_TIMEOUT; + else + return event.events; } /* @@ -700,7 +739,11 @@ FreeWaitEventSet(WaitEventSet *set) ReleaseExternalFD(); #elif defined(WAIT_USE_KQUEUE) close(set->kqueue_fd); - ReleaseExternalFD(); + if (set->kqueue_fd >= 0) + { + close(set->kqueue_fd); + ReleaseExternalFD(); + } #elif defined(WAIT_USE_WIN32) WaitEvent *cur_event; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index b02bad4420..256a77fd32 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -291,6 +291,7 @@ InitPostmasterChild(void) InitializeLatchSupport(); MyLatch = &LocalLatchData; InitLatch(MyLatch); + InitializeCommonWaitSet(); /* * If possible, make this process a group leader, so that the postmaster @@ -323,6 +324,7 @@ InitStandaloneProcess(const char *argv0) InitializeLatchSupport(); MyLatch = &LocalLatchData; InitLatch(MyLatch); + InitializeCommonWaitSet(); /* Compute paths, no postmaster to inherit from */ if (my_exec_path[0] == '\0') diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 46ae56cae3..ec1865a8fd 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -176,6 +176,7 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info); extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); +extern void InitializeCommonWaitSet(void); /* * Unix implementation uses SIGUSR1 for inter-process signaling. -- 2.20.1
From e15e7991f5cfa744e3816f109f9c7400213f8322 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 24 Feb 2020 15:49:00 +1300 Subject: [PATCH 03/11] Use WaitLatch() for condition variables. Previously, condition_variable.c created its own long lived WaitEventSet to avoid extra system calls. WaitLatch() now does that, so there is no point in wasting an extra kernel descriptor. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/storage/lmgr/condition_variable.c | 28 ++++--------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 37b6a4eecd..2ec00397b4 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -30,9 +30,6 @@ /* Initially, we are not prepared to sleep on any condition variable. */ static ConditionVariable *cv_sleep_target = NULL; -/* Reusable WaitEventSet. */ -static WaitEventSet *cv_wait_event_set = NULL; - /* * Initialize a condition variable. */ @@ -62,23 +59,6 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { int pgprocno = MyProc->pgprocno; - /* - * If first time through in this process, create a WaitEventSet, which - * we'll reuse for all condition variable sleeps. - */ - if (cv_wait_event_set == NULL) - { - WaitEventSet *new_event_set; - - new_event_set = CreateWaitEventSet(TopMemoryContext, 2); - AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - /* Don't set cv_wait_event_set until we have a correct WES. */ - cv_wait_event_set = new_event_set; - } - /* * If some other sleep is already prepared, cancel it; this is necessary * because we have just one static variable tracking the prepared sleep, @@ -135,6 +115,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, long cur_timeout = -1; instr_time start_time; instr_time cur_time; + int wait_events; /* * If the caller didn't prepare to sleep explicitly, then do so now and @@ -166,19 +147,20 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, INSTR_TIME_SET_CURRENT(start_time); Assert(timeout >= 0 && timeout <= INT_MAX); cur_timeout = timeout; + wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; } + else + wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; while (true) { - WaitEvent event; bool done = false; /* * Wait for latch to be set. (If we're awakened for some other * reason, the code below will cope anyway.) */ - (void) WaitEventSetWait(cv_wait_event_set, cur_timeout, &event, 1, - wait_event_info); + (void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info); /* Reset latch before examining the state of the wait list. */ ResetLatch(MyLatch); -- 2.20.1
From 587c0e3eb954f8da8eb77f101ac714ddb7c876bf Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 24 Feb 2020 19:05:00 +1300 Subject: [PATCH 04/11] Introduce RemoveWaitEvent(). This will allow WaitEventSet objects to be used in more long lived scenarios, where sockets are added and removed. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/storage/ipc/latch.c | 131 ++++++++++++++++++++++++++++---- src/include/storage/latch.h | 3 + 2 files changed, 120 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 30e461e965..025545fc89 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -84,6 +84,7 @@ struct WaitEventSet { int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + int free_list; /* position of first free event */ /* * Array, of nevents_space length, storing the definition of events this @@ -119,6 +120,8 @@ struct WaitEventSet #elif defined(WAIT_USE_POLL) /* poll expects events to be waited on every poll() call, prepare once */ struct pollfd *pollfds; + /* track the populated range of pollfds */ + int npollfds; #elif defined(WAIT_USE_WIN32) /* @@ -127,6 +130,8 @@ struct WaitEventSet * event->pos + 1). */ HANDLE *handles; + /* track the populated range of handles */ + int nhandles; #endif }; @@ -642,13 +647,16 @@ CreateWaitEventSet(MemoryContext context, int nevents) #elif defined(WAIT_USE_POLL) set->pollfds = (struct pollfd *) data; data += MAXALIGN(sizeof(struct pollfd) * nevents); + set->npollfds = 0; #elif defined(WAIT_USE_WIN32) set->handles = (HANDLE) data; data += MAXALIGN(sizeof(HANDLE) * nevents); + set->nhandles = 0; #endif set->latch = NULL; set->nevents_space = nevents; + set->nevents = 0; set->exit_on_postmaster_death = false; #if defined(WAIT_USE_EPOLL) @@ -714,11 +722,25 @@ CreateWaitEventSet(MemoryContext context, int nevents) * Note: pgwin32_signal_event should be first to ensure that it will be * reported when multiple events are set. We want to guarantee that * pending signals are serviced. + * + * We set unused handles to INVALID_HANDLE_VALUE, because + * WaitForMultipleObjects() considers that to mean "this process" which is + * not signaled until process, so it's a way of leaving a hole in the + * middle of the wait set if you remove something (just like -1 in the poll + * implementation). An alternative would be to fill in holes and create a + * non 1-to-1 mapping between 'events' and 'handles'. */ set->handles[0] = pgwin32_signal_event; - StaticAssertStmt(WSA_INVALID_EVENT == NULL, ""); + for (int i = 0; i < nevents; ++i) + set->handles[i + 1] = INVALID_HANDLE_VALUE; #endif + /* Set up the free list. */ + for (int i = 0; i < nevents; ++i) + set->events[i].next_free = i + 1; + set->events[nevents - 1].next_free = -1; + set->free_list = 0; + return set; } @@ -727,7 +749,6 @@ CreateWaitEventSet(MemoryContext context, int nevents) * * Note: preferably, this shouldn't have to free any resources that could be * inherited across an exec(). If it did, we'd likely leak those resources in - * many scenarios. For the epoll case, we ensure that by setting FD_CLOEXEC * when the FD is created. For the Windows case, we assume that the handles * involved are non-inheritable. */ @@ -748,9 +769,12 @@ FreeWaitEventSet(WaitEventSet *set) WaitEvent *cur_event; for (cur_event = set->events; - cur_event < (set->events + set->nevents); + cur_event < (set->events + set->nhandles); cur_event++) { + if (set->handles[cur_event->pos + 1] == INVALID_HANDLE_VALUE) + continue; + if (cur_event->events & WL_LATCH_SET) { /* uses the latch's HANDLE */ @@ -805,9 +829,6 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, { WaitEvent *event; - /* not enough space */ - Assert(set->nevents < set->nevents_space); - if (events == WL_EXIT_ON_PM_DEATH) { events = WL_POSTMASTER_DEATH; @@ -833,8 +854,12 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); - event = &set->events[set->nevents]; - event->pos = set->nevents++; + /* Do we have any free slots? */ + if (set->free_list == -1) + elog(ERROR, "WaitEventSet is full"); + + event = &set->events[set->free_list]; + event->pos = set->free_list; event->fd = fd; event->events = events; event->user_data = user_data; @@ -868,6 +893,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, WaitEventAdjustWin32(set, event); #endif + /* Remove it from the free list. */ + set->free_list = event->next_free; + event->next_free = -1; + set->nevents++; + return event->pos; } @@ -885,7 +915,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) int old_events; #endif - Assert(pos < set->nevents); + Assert(pos < set->nevents_space); event = &set->events[pos]; #if defined(WAIT_USE_KQUEUE) @@ -933,6 +963,63 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) #endif } +/* + * If the descriptor has already been closed, the kernel should already have + * removed it from the wait set (except in WAIT_USE_POLL). Pass in true for + * fd_closed in that case, so we don't try to remove it ourselves. + */ +void +RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed) +{ + WaitEvent *event; + + Assert(pos >= 0); + Assert(pos < set->nevents_space); + event = &set->events[pos]; + + /* For now only sockets can be removed */ + if ((event->events & WL_SOCKET_MASK) == 0) + elog(ERROR, "event type cannot be removed"); + +#if defined(WAIT_USE_EPOLL) + if (!fd_closed) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL); +#elif defined(WAIT_USE_KQUEUE) + if (!fd_closed) + { + int old_events = event->events; + + event->events = 0; + WaitEventAdjustKqueue(set, event, old_events); + } +#elif defined(WAIT_USE_POLL) + /* no kernel state to remove, just blank out the fd */ + set->pollfds[event->pos].fd = -1; + /* see if we can shrink the range of active fds */ + while (set->npollfds > 0 && + set->pollfds[set->npollfds - 1].fd == -1) + set->npollfds -= 1; +#elif defined(WAIT_USE_WIN32) + if (!fd_closed) + WSAEventSelect(event->fd, NULL, 0); + if (set->handles[event->pos + 1] != INVALID_HANDLE_VALUE) + { + WSACloseEvent(set->handles[event->pos + 1]); + set->handles[event->pos + 1] = INVALID_HANDLE_VALUE; + } + /* see if we can shrink the range of active handles */ + while (set->nhandles > 0 && + set->handles[set->nhandles] == INVALID_HANDLE_VALUE) + set->nhandles -= 1; +#endif + + /* This position is now free. */ + memset(event, 0, sizeof(*event)); + event->next_free = set->free_list; + set->free_list = pos; + set->nevents--; +} + #if defined(WAIT_USE_EPOLL) /* * action can be one of EPOLL_CTL_ADD | EPOLL_CTL_MOD | EPOLL_CTL_DEL @@ -994,6 +1081,9 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) pollfd->revents = 0; pollfd->fd = event->fd; + /* track the known range of populated slots */ + set->npollfds = Max(event->pos + 1, set->nevents); + /* prepare pollfd entry once */ if (event->events == WL_LATCH_SET) { @@ -1072,7 +1162,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) || + (event->events == 0 && + (old_events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1149,6 +1241,9 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) { HANDLE *handle = &set->handles[event->pos + 1]; + /* track the known range of populated slots */ + set->nhandles = Max(event->pos + 1, set->nhandles); + if (event->events == WL_LATCH_SET) { Assert(set->latch != NULL); @@ -1169,12 +1264,15 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) if (event->events & WL_SOCKET_CONNECTED) flags |= FD_CONNECT; - if (*handle == WSA_INVALID_EVENT) + if (*handle == INVALID_HANDLE_VALUE) { *handle = WSACreateEvent(); if (*handle == WSA_INVALID_EVENT) + { + *handle = INVALID_HANDLE_VALUE; elog(ERROR, "failed to create event for socket: error code %u", WSAGetLastError()); + } } if (WSAEventSelect(event->fd, *handle, flags) != 0) elog(ERROR, "failed to set up event for socket: error code %u", @@ -1304,6 +1402,11 @@ WaitEventSetWait(WaitEventSet *set, long timeout, return returned_events; } +int +WaitEventSetSize(WaitEventSet *set) +{ + return set->nevents; +} #if defined(WAIT_USE_EPOLL) @@ -1589,7 +1692,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, struct pollfd *cur_pollfd; /* Sleep */ - rc = poll(set->pollfds, set->nevents, (int) cur_timeout); + rc = poll(set->pollfds, set->npollfds, (int) cur_timeout); /* Check return code */ if (rc < 0) @@ -1761,9 +1864,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, /* * Sleep. * - * Need to wait for ->nevents + 1, because signal handle is in [0]. + * Need to wait for ->nhandles + 1, because signal handle is in [0]. */ - rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE, + rc = WaitForMultipleObjects(set->nhandles + 1, set->handles, FALSE, cur_timeout); /* Check return code */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index ec1865a8fd..210f37659e 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -144,6 +144,7 @@ typedef struct WaitEvent uint32 events; /* triggered events */ pgsocket fd; /* socket fd associated with event */ void *user_data; /* pointer provided in AddWaitEventToSet */ + int next_free; /* free list for internal use */ #ifdef WIN32 bool reset; /* Is reset of the event required? */ #endif @@ -168,6 +169,8 @@ extern void FreeWaitEventSet(WaitEventSet *set); extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data); extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch); +extern void RemoveWaitEvent(WaitEventSet *set, int pos, bool fd_closed); +extern int WaitEventSetSize(WaitEventSet *set); extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, -- 2.20.1
From aa042507c74a2449c2b70fdd1eb062334182a14a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 8 Feb 2020 09:01:19 +1300 Subject: [PATCH 05/11] libpq: Add PQsocketChangeCount() to report socket changes. Users of PQsocket() sometimes wish they could know when the socket changes between calls, to allow for reuse of wait event sets. Provide a new function PQsocketChangeCount() that advances whenever libpq opens or closes a socket. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- doc/src/sgml/libpq.sgml | 17 +++++++++++++++++ src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-connect.c | 10 ++++++++++ src/interfaces/libpq/libpq-fe.h | 1 + src/interfaces/libpq/libpq-int.h | 1 + 5 files changed, 30 insertions(+) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 9a24c19ccb..14f0d998b6 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -2280,6 +2280,23 @@ int PQsocket(const PGconn *conn); </listitem> </varlistentry> + <varlistentry id="libpq-PQsocketChangeCount"> + <term><function>PQsocketChangeCount</function><indexterm><primary>PQsocketChangeCount</primary></indexterm></term> + <listitem> + <para> + Returns a counter that increases whenever the socket has changed. + This can be used to know when the value returned by + <xref linkend="libpq-PQsocket"/> represents a different socket, even + if it happens to have the same file descriptor number. + +<synopsis> +pg_int64 PQsocketChangeCount(const PGconn *conn); +</synopsis> + + </para> + </listitem> + </varlistentry> + <varlistentry id="libpq-PQbackendPID"> <term><function>PQbackendPID</function><indexterm><primary>PQbackendPID</primary></indexterm></term> <listitem> diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 5fc1e5d289..e925ea320f 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,4 @@ PQgetgssctx 176 PQsetSSLKeyPassHook 177 PQgetSSLKeyPassHook 178 PQdefaultSSLKeyPassHook 179 +PQsocketChangeCount 180 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 408000af83..dff73f1784 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -464,6 +464,7 @@ pqDropConnection(PGconn *conn, bool flushInput) if (conn->sock != PGINVALID_SOCKET) closesocket(conn->sock); conn->sock = PGINVALID_SOCKET; + conn->sockChangeCount++; /* Optionally discard any unread data */ if (flushInput) @@ -2538,6 +2539,7 @@ keep_going: /* We will come back to here until there is */ conn->sock = socket(addr_cur->ai_family, SOCK_STREAM, 0); + conn->sockChangeCount++; if (conn->sock == PGINVALID_SOCKET) { /* @@ -6706,6 +6708,14 @@ PQsocket(const PGconn *conn) return (conn->sock != PGINVALID_SOCKET) ? conn->sock : -1; } +pg_int64 +PQsocketChangeCount(const PGconn *conn) +{ + if (!conn) + return -1; + return conn->sockChangeCount; +} + int PQbackendPID(const PGconn *conn) { diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index c9e6ac2b76..8b21373e4d 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -327,6 +327,7 @@ extern int PQprotocolVersion(const PGconn *conn); extern int PQserverVersion(const PGconn *conn); extern char *PQerrorMessage(const PGconn *conn); extern int PQsocket(const PGconn *conn); +extern pg_int64 PQsocketChangeCount(const PGconn *conn); extern int PQbackendPID(const PGconn *conn); extern int PQconnectionNeedsPassword(const PGconn *conn); extern int PQconnectionUsedPassword(const PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 72931e6019..91b190f078 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -409,6 +409,7 @@ struct pg_conn /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ + pg_int64 sockChangeCount; /* advances when socket is opened/closed */ SockAddr laddr; /* Local address */ SockAddr raddr; /* Remote address */ ProtocolVersion pversion; /* FE/BE protocol version in use */ -- 2.20.1
From 2abdbcb71484af0f0c6e62f8fc9ce2be7d99ffaa Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 8 Feb 2020 09:14:29 +1300 Subject: [PATCH 06/11] Reuse a WaitEventSet in libpqwalreceiver.c. To avoid repeatedly setting up and tearing down WaitEventSet objects and associated kernel objects, reuse a WaitEventSet. Export a wait function that is smart enough to handle socket changes under the covers, and then use that for physical and logical replication. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- .../libpqwalreceiver/libpqwalreceiver.c | 126 +++++++++++++----- src/backend/replication/logical/tablesync.c | 6 +- src/backend/replication/logical/worker.c | 6 +- src/backend/replication/walreceiver.c | 21 +-- src/include/replication/walreceiver.h | 5 + 5 files changed, 101 insertions(+), 63 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e4fd1f9bb6..e54581f37f 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -41,6 +41,11 @@ struct WalReceiverConn { /* Current connection to the primary, if any */ PGconn *streamConn; + /* Wait event set used to wait for I/O */ + WaitEventSet *wes; + /* Used to handle changes in the underlying socket */ + int64 wes_fd_change_count; + int wes_socket_position; /* Used to remember if the connection is logical or physical */ bool logical; /* Buffer for currently read records */ @@ -80,6 +85,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const int nRetTypes, const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); +static int libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, @@ -96,13 +102,16 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_create_slot, libpqrcv_get_backend_pid, libpqrcv_exec, - libpqrcv_disconnect + libpqrcv_disconnect, + libpqrcv_wait }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); +static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query); +static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); +static void libpqrcv_prepare_to_wait_for_socket(WalReceiverConn *conn, + int io_flags); /* * Module initialization function @@ -168,6 +177,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, return NULL; } + /* Create a WaitEventSet that will last as long as the connection. */ + conn->wes = CreateWaitEventSet(TopMemoryContext, 3); + conn->wes_socket_position = + AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE, + PQsocket(conn->streamConn), NULL, NULL); + conn->wes_fd_change_count = PQsocketChangeCount(conn->streamConn); + AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); + /* * Poll connection until we have OK or FAILED status. * @@ -177,7 +195,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, do { int io_flag; - int rc; + WaitEvent event; if (status == PGRES_POLLING_READING) io_flag = WL_SOCKET_READABLE; @@ -189,21 +207,19 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, else io_flag = WL_SOCKET_WRITEABLE; - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); + libpqrcv_prepare_to_wait_for_socket(conn, io_flag); + (void) WaitEventSetWait(conn->wes, -1, &event, 1, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); /* Interrupted? */ - if (rc & WL_LATCH_SET) + if (event.events == WL_LATCH_SET) { ResetLatch(MyLatch); ProcessWalRcvInterrupts(); } /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) + if (event.events == io_flag) status = PQconnectPoll(conn->streamConn); } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); @@ -322,7 +338,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -431,7 +447,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -479,7 +495,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -493,7 +509,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -506,7 +522,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -516,7 +532,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", @@ -540,7 +556,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqrcv_PQexec(conn, cmd); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -582,8 +598,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * May return NULL, rather than an error result, on failure. */ static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) +libpqrcv_PQexec(WalReceiverConn *conn, const char *query) { + PGconn *streamConn = conn->streamConn; PGresult *lastResult = NULL; /* @@ -606,7 +623,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) /* Wait for, and collect, the next PGresult. */ PGresult *result; - result = libpqrcv_PQgetResult(streamConn); + result = libpqrcv_PQgetResult(conn); if (result == NULL) break; /* query is complete, or failure */ @@ -631,30 +648,29 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * Perform the equivalent of PQgetResult(), but watch for interrupts. */ static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) +libpqrcv_PQgetResult(WalReceiverConn *conn) { + PGconn *streamConn = conn->streamConn; + /* * Collect data until PQgetResult is ready to get the result without * blocking. */ while (PQisBusy(streamConn)) { - int rc; + WaitEvent event; /* * We don't need to break down the sleep into smaller increments, * since we'll get interrupted by signals and can handle any * interrupts here. */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + libpqrcv_prepare_to_wait_for_socket(conn, WL_SOCKET_READABLE); + (void) WaitEventSetWait(conn->wes, -1, &event, 1, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); /* Interrupted? */ - if (rc & WL_LATCH_SET) + if (event.events == WL_LATCH_SET) { ResetLatch(MyLatch); ProcessWalRcvInterrupts(); @@ -681,9 +697,27 @@ libpqrcv_disconnect(WalReceiverConn *conn) PQfinish(conn->streamConn); if (conn->recvBuf != NULL) PQfreemem(conn->recvBuf); + FreeWaitEventSet(conn->wes); pfree(conn); } +/* + * Wait for new data to arrive, or a timeout. + */ +static int +libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event) +{ + WaitEvent event; + int rc; + + libpqrcv_prepare_to_wait_for_socket(conn, WL_SOCKET_READABLE); + rc = WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event); + if (rc == 0) + return WL_TIMEOUT; + + return event.events; +} + /* * Receive a message available from XLOG stream. * @@ -701,8 +735,7 @@ libpqrcv_disconnect(WalReceiverConn *conn) * ereports on error. */ static int -libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd) +libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd) { int rawlen; @@ -733,13 +766,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) { PQclear(res); @@ -839,7 +872,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -963,7 +996,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqrcv_PQexec(conn, query); switch (PQresultStatus(pgres)) { @@ -1047,3 +1080,26 @@ stringlist_to_identifierstr(PGconn *conn, List *strings) return res.data; } + +/* + * Update our WaitEventSet so that we can wait for 'io_flags' on our socket, + * considering that the socket might have changed. + */ +static void +libpqrcv_prepare_to_wait_for_socket(WalReceiverConn *conn, int io_flags) +{ + if (conn->wes_fd_change_count != PQsocketChangeCount(conn->streamConn)) + { + /* The previous socket has been closed. Replace it with the new one. */ + RemoveWaitEvent(conn->wes, conn->wes_socket_position, true); + conn->wes_socket_position = + AddWaitEventToSet(conn->wes, io_flags, + PQsocket(conn->streamConn), NULL, NULL); + conn->wes_fd_change_count = PQsocketChangeCount(conn->streamConn); + } + else + { + /* No socket change needed, we just need to wait for the right event. */ + ModifyWaitEvent(conn->wes, conn->wes_socket_position, io_flags, NULL); + } +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..6500d7275f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -616,11 +616,7 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Wait for more data or latch. */ - (void) WaitLatchOrSocket(MyLatch, - WL_SOCKET_READABLE | WL_LATCH_SET | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); - + (void) walrcv_wait(wrconn, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); ResetLatch(MyLatch); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ad4a732fd2..f12763f54a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1278,11 +1278,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; - rc = WaitLatchOrSocket(MyLatch, - WL_SOCKET_READABLE | WL_LATCH_SET | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - fd, wait_time, - WAIT_EVENT_LOGICAL_APPLY_MAIN); + rc = walrcv_wait(wrconn, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN); if (rc & WL_LATCH_SET) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2ab15c3cbb..e5d054d1a1 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -503,24 +503,9 @@ WalReceiverMain(void) if (endofwal) break; - /* - * Ideally we would reuse a WaitEventSet object repeatedly - * here to avoid the overheads of WaitLatchOrSocket on epoll - * systems, but we can't be sure that libpq (or any other - * walreceiver implementation) has the same socket (even if - * the fd is the same number, it may have been closed and - * reopened since the last time). In future, if there is a - * function for removing sockets from WaitEventSet, then we - * could add and remove just the socket each time, potentially - * avoiding some system calls. - */ - Assert(wait_fd != PGINVALID_SOCKET); - rc = WaitLatchOrSocket(walrcv->latch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_LATCH_SET, - wait_fd, - NAPTIME_PER_CYCLE, - WAIT_EVENT_WAL_RECEIVER_MAIN); + /* Wait for latch or data. */ + rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE, + WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { ResetLatch(walrcv->latch); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e08afc6548..115d6acf18 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -239,6 +239,8 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, const int nRetTypes, const Oid *retTypes); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); +typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout, + int wait_event); typedef struct WalReceiverFunctionsType { @@ -257,6 +259,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; + walrcv_wait_fn walrcv_wait; } WalReceiverFunctionsType; extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; @@ -291,6 +294,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) +#define walrcv_wait(conn, timeout, wait_event) \ + WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event) static inline void walrcv_clear_result(WalRcvExecResult *walres) -- 2.20.1
From 8ba696292d0c853236acbfd88a03e7bf040105e9 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 25 Feb 2020 15:34:12 +1300 Subject: [PATCH 07/11] Use a WaitEventSet for postgres_fdw. The same WaitEventSet object will be reused for the life of the backend. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- contrib/postgres_fdw/connection.c | 98 +++++++++++++++++++++++++++---- 1 file changed, 88 insertions(+), 10 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index e45647f3ea..0397a16702 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -65,6 +65,12 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; +/* Reusuable WaitEventSet. */ +static WaitEventSet *ConnectionWaitSet = NULL; +static int64 ConnectionWaitSetSocketChangeCount = -1; +static PGconn *ConnectionWaitSetConn = NULL; +static int ConnectionWaitSetPosition = -1; + /* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; static unsigned int prep_stmt_number = 0; @@ -92,6 +98,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); static bool UserMappingPasswordRequired(UserMapping *user); +static int pgfdw_wait_for_socket(PGconn *conn, long timeout); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -115,6 +122,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt) { HASHCTL ctl; + /* + * We'll use a single WaitEventSet for the lifetime of this backend, + * and add and remove sockets as appropriate. Only one socket will + * be in it at a time. + */ + Assert(ConnectionWaitSet == NULL); + ConnectionWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(ConnectionWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); + AddWaitEventToSet(ConnectionWaitSet, WL_EXIT_ON_PM_DEATH, -1, NULL, + NULL); + MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(ConnCacheKey); ctl.entrysize = sizeof(ConnCacheEntry); @@ -344,6 +362,14 @@ disconnect_pg_server(ConnCacheEntry *entry) if (entry->conn != NULL) { PQfinish(entry->conn); + if (ConnectionWaitSetConn == entry->conn) + { + /* We do this after PQfinish, so we know the socket is closed. */ + RemoveWaitEvent(ConnectionWaitSet, + ConnectionWaitSetPosition, + true); + ConnectionWaitSetConn = NULL; + } entry->conn = NULL; ReleaseExternalFD(); } @@ -603,11 +629,7 @@ pgfdw_get_result(PGconn *conn, const char *query) int wc; /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, PG_WAIT_EXTENSION); + wc = pgfdw_wait_for_socket(conn, -1); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -1207,11 +1229,7 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - cur_timeout, PG_WAIT_EXTENSION); + wc = pgfdw_wait_for_socket(conn, cur_timeout); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); @@ -1250,3 +1268,63 @@ exit: ; *result = last_res; return timed_out; } + +static int +pgfdw_wait_for_socket(PGconn *conn, long timeout) +{ + WaitEvent event; + int rc; + + /* If a different conn is in the set, or the socket changed, remove. */ + if (ConnectionWaitSetConn) + { + bool socket_changed = + (ConnectionWaitSetSocketChangeCount != + PQsocketChangeCount(ConnectionWaitSetConn)); + + if (ConnectionWaitSetConn == conn) + { + /* + * This connection is already in there, but the socket might have + * changed. If so, remove it. + */ + if (socket_changed) + { + RemoveWaitEvent(ConnectionWaitSet, + ConnectionWaitSetPosition, + true); + ConnectionWaitSetConn = NULL; + } + } + else + { + /* + * A different connection is in there. Remove it, being careful + * to report whether the socket was already closed (this affects + * whether we unregister the fd with the kernel). + */ + RemoveWaitEvent(ConnectionWaitSet, + ConnectionWaitSetPosition, + socket_changed); + ConnectionWaitSetConn = NULL; + } + } + + /* Do we need to add our connection? */ + if (ConnectionWaitSetConn == NULL) + { + ConnectionWaitSetPosition = + AddWaitEventToSet(ConnectionWaitSet, WL_SOCKET_READABLE, + PQsocket(conn), NULL, NULL); + ConnectionWaitSetConn = conn; + ConnectionWaitSetSocketChangeCount = PQsocketChangeCount(conn); + } + + /* Finally, we can wait. */ + rc = WaitEventSetWait(ConnectionWaitSet, timeout, &event, 1, + PG_WAIT_EXTENSION); + if (rc == 0) + return WL_TIMEOUT; + + return event.events; +} -- 2.20.1
From 9e457207c655b11ae5ed3fe97d481a8b044b0674 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 24 Feb 2020 23:51:09 +1300 Subject: [PATCH 08/11] Use WL_EXIT_ON_PM_DEATH in FeBeWaitSet. Previously, we'd give either a FATAL message or a silent exit() when we detected postmaster death, depending on which wait point we were at. Make the exit more uniform, by using the standard exit facility. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/libpq/be-secure.c | 28 ---------------------------- src/backend/libpq/pqcomm.c | 2 +- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 2ae507a902..aec0926d93 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -184,28 +184,6 @@ retry: WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_READ); - /* - * If the postmaster has died, it's not safe to continue running, - * because it is the postmaster's job to kill us if some other backend - * exits uncleanly. Moreover, we won't run very well in this state; - * helper processes like walwriter and the bgwriter will exit, so - * performance may be poor. Finally, if we don't exit, pg_ctl will be - * unable to restart the postmaster without manual intervention, so no - * new connections can be accepted. Exiting clears the deck for a - * postmaster restart. - * - * (Note that we only make this check when we would otherwise sleep on - * our latch. We might still continue running for a while if the - * postmaster is killed in mid-query, or even through multiple queries - * if we never have to wait for read. We don't want to burn too many - * cycles checking for this very rare condition, and this should cause - * us to exit quickly in most cases.) - */ - if (event.events & WL_POSTMASTER_DEATH) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating connection due to unexpected postmaster exit"))); - /* Handle interrupt. */ if (event.events & WL_LATCH_SET) { @@ -296,12 +274,6 @@ retry: WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1, WAIT_EVENT_CLIENT_WRITE); - /* See comments in secure_read. */ - if (event.events & WL_POSTMASTER_DEATH) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating connection due to unexpected postmaster exit"))); - /* Handle interrupt. */ if (event.events & WL_LATCH_SET) { diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 7717bb2719..5422175185 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -222,7 +222,7 @@ pq_init(void) AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); - AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL); + AddWaitEventToSet(FeBeWaitSet, WL_EXIT_ON_PM_DEATH, -1, NULL, NULL); } /* -------------------------------- -- 2.20.1
From 3ba8b6165cbd975ede1cf06c03032fb65fc49278 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 24 Feb 2020 23:48:29 +1300 Subject: [PATCH 09/11] Use FeBeWaitSet for walsender.c. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/replication/walsender.c | 32 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index abb533b9d0..580187d636 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1235,7 +1235,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* If we have pending write here, go to slow path */ for (;;) { - int wakeEvents; + WaitEvent event; long sleeptime; /* Check for input from the client */ @@ -1252,13 +1252,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | - WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; - /* Sleep until something happens or we time out */ - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); + ModifyWaitEvent(FeBeWaitSet, 0, + WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, NULL); + (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1342,6 +1340,7 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { + WaitEvent event; long sleeptime; /* Clear any already-pending wakeups */ @@ -1435,15 +1434,14 @@ WalSndWaitForWal(XLogRecPtr loc) */ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | - WL_SOCKET_READABLE | WL_TIMEOUT; + wakeEvents = WL_SOCKET_READABLE; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WAIT_WAL); + ModifyWaitEvent(FeBeWaitSet, 0, wakeEvents, NULL); + (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1, + WAIT_EVENT_WAL_SENDER_WAIT_WAL); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2282,9 +2280,9 @@ WalSndLoop(WalSndSendDataCallback send_data) { long sleeptime; int wakeEvents; + WaitEvent event; - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | - WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; /* * Use fresh timestamp, not last_processing, to reduce the chance @@ -2296,9 +2294,9 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_MAIN); + ModifyWaitEvent(FeBeWaitSet, 0, wakeEvents, NULL); + (void) WaitEventSetWait(FeBeWaitSet, sleeptime, &event, 1, + WAIT_EVENT_WAL_SENDER_MAIN); } } } -- 2.20.1
From c383e9cd06fb89b7fc020b330a3d9cd003d2993a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 21 Jan 2020 12:54:11 +1300 Subject: [PATCH 10/11] Introduce a WaitEventSet for the stats collector. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com --- src/backend/postmaster/pgstat.c | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 462b4d7e06..2e16f6cc66 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4429,6 +4429,8 @@ PgstatCollectorMain(int argc, char *argv[]) int len; PgStat_Msg msg; int wr; + WaitEvent event; + WaitEventSet *wes; /* * Ignore all signals usually bound to some action in the postmaster, @@ -4458,6 +4460,12 @@ PgstatCollectorMain(int argc, char *argv[]) pgStatRunningInCollector = true; pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true); + /* Prepare to wait for our latch or data in our socket. */ + wes = CreateWaitEventSet(CurrentMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL); + /* * Loop to process messages until we get SIGQUIT or detect ungraceful * death of our parent postmaster. @@ -4636,10 +4644,7 @@ PgstatCollectorMain(int argc, char *argv[]) /* Sleep until there's something to do */ #ifndef WIN32 - wr = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE, - pgStatSock, -1L, - WAIT_EVENT_PGSTAT_MAIN); + wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN); #else /* @@ -4652,18 +4657,15 @@ PgstatCollectorMain(int argc, char *argv[]) * to not provoke "using stale statistics" complaints from * backend_read_statsfile. */ - wr = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT, - pgStatSock, - 2 * 1000L /* msec */ , - WAIT_EVENT_PGSTAT_MAIN); + wr = WaitEventSetWait(wes, 2 * 1000L /* msec */, &event, 1, + WAIT_EVENT_PGSTAT_MAIN); #endif /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. */ - if (wr & WL_POSTMASTER_DEATH) + if (wr == 1 && event.events == WL_POSTMASTER_DEATH) break; } /* end of outer loop */ @@ -4672,6 +4674,8 @@ PgstatCollectorMain(int argc, char *argv[]) */ pgstat_write_statsfiles(true, true); + FreeWaitEventSet(wes); + exit(0); } -- 2.20.1