Hi, On 2022-01-29 12:44:22 -0800, Andres Freund wrote: > On 2022-01-17 10:06:56 -0800, Andres Freund wrote: > > Yes, that's what I was suggesting. I wasn't thinking of using a static var, > > but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket() > > is doing doesn't protect against the problem referenced above, because it > > still is reset by WSAEventSelect. > > Do we are about breaking StreamCtl ABI? I don't think so?
Here's a version of the patch only creating the event once. Needs a small bit of comment polishing, but otherwise I think it's sane? Greetings, Andres Freund
>From 2c1d67a0b8dff8b6be9683d422d251c937db9121 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sun, 16 Jan 2022 01:58:24 -0800 Subject: [PATCH v3] Avoid slow shutdown of pg_basebackup, windows edition. See also 7834d20b57a. Discussion: https://postgr.es/m/20220129204422.ljyxclfy5ubws...@alap3.anarazel.de --- src/bin/pg_basebackup/pg_basebackup.c | 24 ++++++- src/bin/pg_basebackup/pg_receivewal.c | 4 ++ src/bin/pg_basebackup/receivelog.c | 96 ++++++++++++++++++++++++--- src/bin/pg_basebackup/receivelog.h | 6 ++ 4 files changed, 118 insertions(+), 12 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index c40925c1f04..a539fbe6e0e 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -169,6 +169,8 @@ static const char *progress_filename; /* Pipe to communicate with background wal receiver process */ #ifndef WIN32 static int bgpipe[2] = {-1, -1}; +#else +HANDLE *bgevent = NULL; #endif /* Handle to child process */ @@ -506,7 +508,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline, /* * At this point we have an end pointer, so compare it to the current * position to figure out if it's time to stop. + * + * On windows we need to reset the event used to wake up the streaming + * thread, otherwise CopyStreamPoll() will start to immediately return. */ +#ifdef WIN32 + ResetEvent(bgevent); +#endif + if (segendpos >= xlogendptr) return true; @@ -541,7 +550,7 @@ LogStreamerMain(logstreamer_param *param) #ifndef WIN32 stream.stop_socket = bgpipe[0]; #else - stream.stop_socket = PGINVALID_SOCKET; + stream.stop_event = bgevent; #endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; @@ -627,6 +636,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) pg_log_error("could not create pipe for background process: %m"); exit(1); } +#else + bgevent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (bgevent == NULL) + { + pg_log_error("could not create event for background thread: %lu", + GetLastError()); + exit(1); + } #endif /* Get a second connection */ @@ -2216,7 +2233,9 @@ BaseBackup(void) /* * On Windows, since we are in the same process, we can just store the * value directly in the variable, and then set the flag that says - * it's there. + * it's there. To interrupt the thread while it's waiting for network + * IO, we set an event (which the thread waits on in addition to the + * socket). */ if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2) { @@ -2226,6 +2245,7 @@ BaseBackup(void) } xlogendptr = ((uint64) hi) << 32 | lo; InterlockedIncrement(&has_xlogendptr); + SetEvent(bgevent); /* First wait for the thread to exit */ if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) != diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index ccb215c398c..d27bd85b7ce 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -618,7 +618,11 @@ StreamLog(void) stream.timeline); stream.stream_stop = stop_streaming; +#ifndef WIN32 stream.stop_socket = PGINVALID_SOCKET; +#else + stream.stop_event = NULL; +#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = synchronous; stream.do_sync = do_sync; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d39e4b11a1a..cc46de0252e 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -37,8 +37,8 @@ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); -static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +static int CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream); +static int CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); @@ -414,6 +414,27 @@ CheckServerVersionForStreaming(PGconn *conn) return true; } +static void +XLogStreamInit(PGconn *conn, StreamCtl *stream) +{ +#ifdef WIN32 + stream->net_event = WSACreateEvent(); + if (stream->net_event == WSA_INVALID_EVENT) + { + pg_log_error("failed to create event for socket: error code %d", + WSAGetLastError()); + exit(1); + } + + if (WSAEventSelect(PQsocket(conn), stream->net_event, FD_READ | FD_CLOSE) != 0) + { + pg_log_error("failed to set up event for socket: error code %d", + WSAGetLastError()); + exit(1); + } +#endif +} + /* * Receive a log stream starting at the specified position. * @@ -463,6 +484,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) PGresult *res; XLogRecPtr stoppos; + XLogStreamInit(conn, stream); + /* * The caller should've checked the server version already, but doesn't do * any harm to check it here too. @@ -813,7 +836,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); - r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, sleeptime, stream, ©buf); while (r != 0) { if (r == -1) @@ -858,7 +881,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, 0, stream, ©buf); } } @@ -877,8 +900,9 @@ error: * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) +CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream) { +#ifndef WIN32 int ret; fd_set input_mask; int connsocket; @@ -896,10 +920,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) FD_ZERO(&input_mask); FD_SET(connsocket, &input_mask); maxfd = connsocket; - if (stop_socket != PGINVALID_SOCKET) + if (stream->stop_socket != PGINVALID_SOCKET) { - FD_SET(stop_socket, &input_mask); - maxfd = Max(maxfd, stop_socket); + FD_SET(stream->stop_socket, &input_mask); + maxfd = Max(maxfd, stream->stop_socket); } if (timeout_ms < 0) @@ -924,6 +948,58 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) return 1; /* Got input on connection socket */ return 0; /* Got timeout or input on stop_socket */ +#else + int ret; + int rc; + int nevents = 0; + HANDLE events[2]; + + + events[0] = stream->net_event; + nevents++; + + if (stream->stop_event != NULL) + { + events[1] = stream->stop_event; + nevents++; + } + + /* map timeout_ms to WaitForMultipleObjects expectations */ + if (timeout_ms < 0) + timeout_ms = INFINITE; + + rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms); + + if (rc == WAIT_FAILED) + { + pg_log_error("WaitForMultipleObjects() failed: error code %lu", + GetLastError()); + exit(1); + } + else if (rc == WAIT_TIMEOUT) + { + /* timeout exceeded */ + ret = 0; + } + else if (rc == WAIT_OBJECT_0) + { + /* Got input on connection socket */; + ret = 1; + } + else if (rc == (WAIT_OBJECT_0 + 1)) + { + Assert(stream->stop_event != NULL); + /* Got event on stop socket */; + ret = 0; + } + else + { + pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc); + exit(1); + } + + return ret; +#endif /* WIN32 */ } /* @@ -939,7 +1015,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer) { char *copybuf = NULL; @@ -960,7 +1036,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, * the specified timeout, so that we can ping the server. Also stop * waiting if input appears on stop_socket. */ - ret = CopyStreamPoll(conn, timeout, stop_socket); + ret = CopyStreamPoll(conn, timeout, stream); if (ret <= 0) return ret; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 050d4bc69fd..c0b1911a6c1 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -40,8 +40,14 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ +#ifndef WIN32 pgsocket stop_socket; /* if valid, watch for input on this socket * and check stream_stop() when there is any */ +#else + HANDLE *stop_event; /* on windows, check an event instead */ + + HANDLE *net_event; /* event to wait for network IO */ +#endif WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ -- 2.34.0