On Mon, Jan 10, 2022 at 10:20 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > On Mon, Jan 10, 2022 at 8:00 PM Alexander Lakhin <exclus...@gmail.com> wrote: > > The libpqrcv_PQgetResult function, in turn, invokes WaitLatchOrSocket() > > where WaitEvents are defined locally, and the closed flag set on the > > first invocation but expected to be checked on second. > > D'oh, right. There's also a WaitLatchOrSocket call in walreceiver.c. > We'd need a long-lived WaitEventSet common across all of these sites, > which is hard here (because the socket might change under you, as > discussed in other threads that introduced long lived WaitEventSets to > other places but not here).
This is super quick-and-dirty code (and doesn't handle some errors or socket changes correctly), but does it detect the closed socket?
From 3590a8c9b3e8992a65dea7a9d6aecdca2f25582d Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 10 Jan 2022 14:45:05 +1300 Subject: [PATCH v2 1/2] Make Windows FD_CLOSE reporting sticky. XXX Just testing an idea... --- src/backend/storage/ipc/latch.c | 16 ++++++++++++++++ src/include/storage/latch.h | 1 + 2 files changed, 17 insertions(+) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 1d893cf863..8f3176a00f 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -899,6 +899,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, event->user_data = user_data; #ifdef WIN32 event->reset = false; + event->closed = false; #endif if (events == WL_LATCH_SET) @@ -1882,6 +1883,20 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, return 1; } } + + /* + * XXX: HYPOTHESIS TO TEST + * Windows only reports FD_CLOSE once. If we've seen that already, + * continue to report it. + */ + if ((cur_event->events & WL_SOCKET_MASK) && cur_event->closed) + { + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = (cur_event->events & WL_SOCKET_MASK); + occurred_events->fd = cur_event->fd; + return 1; + } } /* @@ -2002,6 +2017,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, { /* EOF/error, so signal all caller-requested socket flags */ occurred_events->events |= (cur_event->events & WL_SOCKET_MASK); + cur_event->closed = true; } if (occurred_events->events != 0) diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 44f9368c64..c24f46dc37 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -147,6 +147,7 @@ typedef struct WaitEvent void *user_data; /* pointer provided in AddWaitEventToSet */ #ifdef WIN32 bool reset; /* Is reset of the event required? */ + bool closed; /* Has FD_CLOSE event been reported? */ #endif } WaitEvent; -- 2.33.1
From 755ee3da7934d33a706d59eb0abc0e4f1829acb0 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 10 Jan 2022 22:02:21 +1300 Subject: [PATCH v2 2/2] XXX Quick hack to use persistent WaitEventSet in walreceiver XXX There are several things wrong with this, it's just experimental code. --- .../libpqwalreceiver/libpqwalreceiver.c | 97 ++++++++++++------- src/backend/replication/walreceiver.c | 12 +-- src/include/replication/walreceiver.h | 8 ++ 3 files changed, 75 insertions(+), 42 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c08e599eef..f8dd76e2d0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -40,6 +40,7 @@ void _PG_init(void); struct WalReceiverConn { + WaitEventSet *wes; /* Current connection to the primary, if any */ PGconn *streamConn; /* Used to remember if the connection is logical or physical */ @@ -82,6 +83,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const int nRetTypes, const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); +static int libpqrcv_wait(WalReceiverConn *conn, int timeout, int wait_event); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, @@ -98,12 +100,13 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_create_slot, libpqrcv_get_backend_pid, libpqrcv_exec, - libpqrcv_disconnect + libpqrcv_disconnect, + libpqrcv_wait }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); +static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query); +static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -182,6 +185,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, return NULL; } + conn->wes = CreateWaitEventSet(TopMemoryContext, 3); + AddWaitEventToSet(conn->wes, WL_SOCKET_WRITEABLE, + PQsocket(conn->streamConn), NULL, NULL); + AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* * Poll connection until we have OK or FAILED status. * @@ -191,7 +203,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, do { int io_flag; - int rc; + WaitEvent event; if (status == PGRES_POLLING_READING) io_flag = WL_SOCKET_READABLE; @@ -203,21 +215,20 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, else io_flag = WL_SOCKET_WRITEABLE; - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); + ModifyWaitEvent(conn->wes, 0, io_flag, NULL); + if (WaitEventSetWait(conn->wes, -1, &event, 1, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT) < 1) + continue; /* Interrupted? */ - if (rc & WL_LATCH_SET) + if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); ProcessWalRcvInterrupts(); } /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) + if (event.events & io_flag) status = PQconnectPoll(conn->streamConn); } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); @@ -231,7 +242,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, + res = libpqrcv_PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -359,7 +370,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -482,7 +493,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -532,7 +543,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -547,7 +558,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -561,7 +572,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -572,7 +583,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -597,7 +608,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqrcv_PQexec(conn, cmd); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -641,8 +652,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * May return NULL, rather than an error result, on failure. */ static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) +libpqrcv_PQexec(WalReceiverConn *conn, const char *query) { + PGconn *streamConn = conn->streamConn; PGresult *lastResult = NULL; /* @@ -665,7 +677,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) /* Wait for, and collect, the next PGresult. */ PGresult *result; - result = libpqrcv_PQgetResult(streamConn); + result = libpqrcv_PQgetResult(conn); if (result == NULL) break; /* query is complete, or failure */ @@ -690,30 +702,31 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * Perform the equivalent of PQgetResult(), but watch for interrupts. */ static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) +libpqrcv_PQgetResult(WalReceiverConn *conn) { + PGconn *streamConn = conn->streamConn; + /* * Collect data until PQgetResult is ready to get the result without * blocking. */ while (PQisBusy(streamConn)) { - int rc; + WaitEvent event; /* * We don't need to break down the sleep into smaller increments, * since we'll get interrupted by signals and can handle any * interrupts here. */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + ModifyWaitEvent(conn->wes, 0, WL_SOCKET_READABLE, NULL); + + if (WaitEventSetWait(conn->wes, -1, &event, 1, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE) < 1) + continue; /* Interrupted? */ - if (rc & WL_LATCH_SET) + if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); ProcessWalRcvInterrupts(); @@ -737,12 +750,28 @@ libpqrcv_PQgetResult(PGconn *streamConn) static void libpqrcv_disconnect(WalReceiverConn *conn) { + FreeWaitEventSet(conn->wes); PQfinish(conn->streamConn); if (conn->recvBuf != NULL) PQfreemem(conn->recvBuf); pfree(conn); } +/* + * Wait for data, latch, or timeout. + */ +static int +libpqrcv_wait(WalReceiverConn *conn, int timeout, int wait_event) +{ + WaitEvent event; + + if (WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event) == 1) + return event.events; + + return 0; +} + + /* * Receive a message available from XLOG stream. * @@ -793,13 +822,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) { PQclear(res); @@ -941,7 +970,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1068,7 +1097,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqrcv_PQexec(conn, query); switch (PQresultStatus(pgres)) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7a7eb3784e..f308ea9c24 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -507,13 +507,9 @@ WalReceiverMain(void) * could add and remove just the socket each time, potentially * avoiding some system calls. */ - Assert(wait_fd != PGINVALID_SOCKET); - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_LATCH_SET, - wait_fd, - NAPTIME_PER_CYCLE, - WAIT_EVENT_WAL_RECEIVER_MAIN); + //Assert(wait_fd != PGINVALID_SOCKET); + rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE, + WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -532,7 +528,7 @@ WalReceiverMain(void) XLogWalRcvSendReply(true, false); } } - if (rc & WL_TIMEOUT) + if (rc == 0) { /* * We didn't receive anything new. If we haven't heard diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0b607ed777..3e80ff63ce 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -380,6 +380,11 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); +/* + * XXX Wait for readable data, timeout, or latch. + */ +typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, int timeout, int wait_event); + typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; @@ -397,6 +402,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; + walrcv_wait_fn walrcv_wait; } WalReceiverFunctionsType; extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; @@ -431,6 +437,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) +#define walrcv_wait(conn, timeout, wait_event) \ + WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event) static inline void walrcv_clear_result(WalRcvExecResult *walres) -- 2.33.1