Thank you fixing the issue. At Tue, 23 Jan 2024 11:43:43 +0200, Heikki Linnakangas <hlinn...@iki.fi> wrote i n > There's an existing AmWalReceiverProcess() macro too. Let's use that.
Mmm. I sought an Is* function becuase "IsLogicalWorker()" is placed on the previous line. Our convention regarding those functions (macros) and variables seems inconsistent. However, I can't say for sure that we should unify all of them. > (See also > https://www.postgresql.org/message-id/f3ecd4cb-85ee-4e54-8278-5fabfb3a4ed0%40iki.fi > for refactoring in this area) > > Here's a patch set summarizing the changes so far. They should be > squashed, but I kept them separate for now to help with review: > > 1. revert the revert of 728f86fec6. > 2. your walrcv_shutdown_deblocking_v2-2.patch > 3. Also replace libpqrcv_PQexec() and libpqrcv_PQgetResult() with the > wrappers from libpq-be-fe-helpers.h Both replacements look fine. I didn't find another instance of similar code. > 4. Replace IsWalReceiver() with AmWalReceiverProcess() Just look fine. > >> - pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request > >> - shutdown */ > >> + pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown > >> */ > >> > >> Can't we just use die(), instead? > > There was a comment explaining the problems associated with exiting > > within a signal handler; > > - * Currently, only SIGTERM is of interest. We can't just exit(1) within > > - * the > > - * SIGTERM signal handler, because the signal might arrive in the middle > > - * of > > - * some critical operation, like while we're holding a spinlock. > > - * Instead, the > > And I think we should keep the considerations it suggests. The patch > > removes the comment itself, but it does so because it implements our > > standard process exit procedure, which incorporates points suggested > > by the now-removed comment. > > die() doesn't call exit(1). Unless DoingCommandRead is set, but it > never is in the walreceiver. It looks just like the new > WalRcvShutdownSignalHandler() function. Am I missing something? Ugh.. Doesn't the name 'die()' suggest exit()? I agree that die() can be used instad. > Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in > the signal handler? I noticed that but ignored for this time. > I also wonder if we should replace SignalHandlerForShutdownRequest() > completely with die(), in all processes? The difference is that > SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while > die() uses ProcDiePending && InterruptPending to indicate that the > signal was received. Or do some of the processes want to check for > ShutdownRequestPending only at specific places, and don't want to get > terminated at the any random CHECK_FOR_INTERRUPTS()? At least, pg_log_backend_memory_context(<chkpt_pid>) causes a call to ProcessInterrupts via "ereport(LOG_SERVER_ONLY" which can lead to an exit due to ProcDiePending. In this regard, checkpointer clearly requires the distinction. Rather than merely consolidating the notification variables and striving to annihilate CFI calls in the execution path, I believe we need a shutdown mechanism that CFI doesn't react to. However, as for the method to achieve this, whether we should keep the notification variables separate as they are now, or whether it would be better to introduce a variable that causes CFI to ignore ProcDiePending, is a matter I think is open to discussion. Attached patches are the rebased version of v3 (0003 is updated) and additional 0005 that makes use of die() instead of walreceiver's custom function. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
>From daac3aa06bd33f5e11118f04a406c8a59fd4cc97 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas <heikki.linnakan...@iki.fi> Date: Tue, 23 Jan 2024 11:01:03 +0200 Subject: [PATCH v4 1/5] Revert "Revert "libpqwalreceiver: Convert to libpq-be-fe-helpers.h"" This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4. --- .../libpqwalreceiver/libpqwalreceiver.c | 55 +++---------------- 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2439733b55..dbee2f8f0e 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -24,6 +24,7 @@ #include "common/connect.h" #include "funcapi.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -136,7 +137,6 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; - PostgresPollingStatusType status; const char *keys[6]; const char *vals[6]; int i = 0; @@ -192,56 +192,17 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, Assert(i < sizeof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); - if (PQstatus(conn->streamConn) == CONNECTION_BAD) - goto bad_connection_errmsg; - - /* - * Poll connection until we have OK or FAILED status. - * - * Per spec for PQconnectPoll, first wait till socket is write-ready. - */ - status = PGRES_POLLING_WRITING; - do - { - int io_flag; - int rc; - - if (status == PGRES_POLLING_READING) - io_flag = WL_SOCKET_READABLE; -#ifdef WIN32 - /* Windows needs a different test while waiting for connection-made */ - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) - io_flag = WL_SOCKET_CONNECTED; -#endif - else - io_flag = WL_SOCKET_WRITEABLE; - - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) - status = PQconnectPoll(conn->streamConn); - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + conn->streamConn = + libpqsrv_connect_params(keys, vals, + /* expand_dbname = */ true, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); if (PQstatus(conn->streamConn) != CONNECTION_OK) goto bad_connection_errmsg; if (must_use_password && !PQconnectionUsedPassword(conn->streamConn)) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); ereport(ERROR, @@ -277,7 +238,7 @@ bad_connection_errmsg: /* error path, error already set */ bad_connection: - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); return NULL; } @@ -813,7 +774,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) static void libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); PQfreemem(conn->recvBuf); pfree(conn); } -- 2.39.3
>From 1d8f786f2b8185bde63bfb443152006b6aef9b0b Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas <heikki.linnakan...@iki.fi> Date: Tue, 23 Jan 2024 11:01:25 +0200 Subject: [PATCH v4 2/5] Apply walrcv_shutdown_deblocking_v2-2.patch >From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com --- .../libpqwalreceiver/libpqwalreceiver.c | 4 +- src/backend/replication/walreceiver.c | 53 +++++++++---------- src/backend/tcop/postgres.c | 5 ++ src/include/replication/walreceiver.h | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index dbee2f8f0e..41cf3dc853 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -753,7 +753,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); } /* Consume whatever data is available from the socket */ @@ -1093,7 +1093,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, { char *cstrs[MaxTupleAttributeNumber]; - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); /* Do the allocations in temporary context. */ oldcontext = MemoryContextSwitchTo(rowcontext); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e29a6196a3..110d96a4e6 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); +static void WalRcvShutdownSignalHandler(SIGNAL_ARGS); -/* - * Process any interrupts the walreceiver process may have received. - * This should be called any time the process's latch has become set. - * - * Currently, only SIGTERM is of interest. We can't just exit(1) within the - * SIGTERM signal handler, because the signal might arrive in the middle of - * some critical operation, like while we're holding a spinlock. Instead, the - * signal handler sets a flag variable as well as setting the process's latch. - * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the - * latch has become set. Operations that could block for a long time, such as - * reading from a remote server, must pay attention to the latch too; see - * libpqrcv_PQgetResult for example. - */ void -ProcessWalRcvInterrupts(void) +WalRcvShutdownSignalHandler(SIGNAL_ARGS) { - /* - * Although walreceiver interrupt handling doesn't use the same scheme as - * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive - * any incoming signals on Win32, and also to make sure we process any - * barrier events. - */ - CHECK_FOR_INTERRUPTS(); + int save_errno = errno; - if (ShutdownRequestPending) + /* Don't joggle the elbow of proc_exit */ + if (!proc_exit_inprogress) { - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating walreceiver process due to administrator command"))); + InterruptPending = true; + ProcDiePending = true; } + + SetLatch(MyLatch); + + errno = save_errno; + } +/* + * Is current process a wal receiver? + */ +bool +IsWalReceiver(void) +{ + return MyBackendType == B_WAL_RECEIVER; +} /* Main entry point for walreceiver process */ void @@ -277,7 +272,7 @@ WalReceiverMain(void) pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */ + pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */ /* SIGQUIT handler was already set up by InitPostmasterChild */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); @@ -456,7 +451,7 @@ WalReceiverMain(void) errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); if (ConfigReloadPending) { @@ -552,7 +547,7 @@ WalReceiverMain(void) if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); if (walrcv->force_reply) { @@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) { ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_RESTARTING || diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1a34bd3715..2ce24d8a9a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -59,6 +59,7 @@ #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/slot.h" +#include "replication/walreceiver.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" #include "storage/bufmgr.h" @@ -3286,6 +3287,10 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsWalReceiver()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba1..916d7b84fe 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -470,8 +470,8 @@ walrcv_clear_result(WalRcvExecResult *walres) } /* prototypes for functions in walreceiver.c */ +extern bool IsWalReceiver(void); extern void WalReceiverMain(void) pg_attribute_noreturn(); -extern void ProcessWalRcvInterrupts(void); extern void WalRcvForceReply(void); /* prototypes for functions in walreceiverfuncs.c */ -- 2.39.3
>From 648538720d982a981f4b8f7b118d59d292392fe3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas <heikki.linnakan...@iki.fi> Date: Mon, 29 Jan 2024 16:24:21 +0900 Subject: [PATCH v4 3/5] Use libpq-be-fe-helpers.h wrappers more --- .../libpqwalreceiver/libpqwalreceiver.c | 151 ++++-------------- 1 file changed, 33 insertions(+), 118 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 41cf3dc853..ae48dbafb5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -106,8 +106,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -216,8 +214,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, - ALWAYS_SECURE_SEARCH_PATH_SQL); + res = libpqsrv_exec(conn->streamConn, + ALWAYS_SECURE_SEARCH_PATH_SQL, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -389,7 +388,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(conn->streamConn, + "IDENTIFY_SYSTEM", + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -522,7 +523,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -552,7 +555,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PGresult *res; /* - * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * Send copy-end message. As in libpqsrv_exec, this could theoretically * block, but the risk seems small. */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || @@ -572,7 +575,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -587,7 +591,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -601,7 +606,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -612,7 +618,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -637,7 +644,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqsrv_exec(conn->streamConn, + cmd, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -667,107 +676,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, PQclear(res); } -/* - * Send a query and wait for the results by using the asynchronous libpq - * functions and socket readiness events. - * - * The function is modeled on libpqsrv_exec(), with the behavior difference - * being that it calls ProcessWalRcvInterrupts(). As an optimization, it - * skips try/catch, since all errors terminate the process. - * - * May return NULL, rather than an error result, on failure. - */ -static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) -{ - PGresult *lastResult = NULL; - - /* - * PQexec() silently discards any prior query results on the connection. - * This is not required for this function as it's expected that the caller - * (which is this library in all cases) will behave correctly and we don't - * have to be backwards compatible with old libpq. - */ - - /* - * Submit the query. Since we don't use non-blocking mode, this could - * theoretically block. In practice, since we don't send very long query - * strings, the risk seems negligible. - */ - if (!PQsendQuery(streamConn, query)) - return NULL; - - for (;;) - { - /* Wait for, and collect, the next PGresult. */ - PGresult *result; - - result = libpqrcv_PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete, or failure */ - - /* - * Emulate PQexec()'s behavior of returning the last result when there - * are many. We are fine with returning just last error message. - */ - PQclear(lastResult); - lastResult = result; - - if (PQresultStatus(lastResult) == PGRES_COPY_IN || - PQresultStatus(lastResult) == PGRES_COPY_OUT || - PQresultStatus(lastResult) == PGRES_COPY_BOTH || - PQstatus(streamConn) == CONNECTION_BAD) - break; - } - - return lastResult; -} - -/* - * Perform the equivalent of PQgetResult(), but watch for interrupts. - */ -static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) -{ - /* - * Collect data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; - - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can handle any - * interrupts here. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; return NULL */ - return NULL; - } - } - - /* Now we can collect and return the next PGresult */ - return PQgetResult(streamConn); -} - /* * Disconnect connection to primary, if any. */ @@ -828,13 +736,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) { PQclear(res); @@ -985,7 +895,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1026,7 +938,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, quote_identifier(slotname), failover ? "true" : "false"); - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -1139,7 +1052,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqsrv_exec(conn->streamConn, + query, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); switch (PQresultStatus(pgres)) { -- 2.39.3
>From 1062917551121021c6ad9d56273da5e483ea7c29 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas <heikki.linnakan...@iki.fi> Date: Tue, 23 Jan 2024 11:19:07 +0200 Subject: [PATCH v4 4/5] Use existing AmWalReceiverProcess() function --- src/backend/replication/walreceiver.c | 9 --------- src/backend/tcop/postgres.c | 2 +- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 110d96a4e6..abaf21a20c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -167,15 +167,6 @@ WalRcvShutdownSignalHandler(SIGNAL_ARGS) } -/* - * Is current process a wal receiver? - */ -bool -IsWalReceiver(void) -{ - return MyBackendType == B_WAL_RECEIVER; -} - /* Main entry point for walreceiver process */ void WalReceiverMain(void) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 2ce24d8a9a..5a4dc1977d 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3287,7 +3287,7 @@ ProcessInterrupts(void) */ proc_exit(1); } - else if (IsWalReceiver()) + else if (AmWalReceiverProcess()) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating walreceiver process due to administrator command"))); -- 2.39.3
>From 68b3bbf30e33b9f79e605899b6540e619329a119 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota....@gmail.com> Date: Mon, 29 Jan 2024 16:24:41 +0900 Subject: [PATCH v4 5/5] Use die() instead of WalRcvShutdownSignalHandler() --- src/backend/replication/walreceiver.c | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index abaf21a20c..1e03d55062 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -73,6 +73,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/procsignal.h" +#include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -147,25 +148,7 @@ static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); -static void WalRcvShutdownSignalHandler(SIGNAL_ARGS); -void -WalRcvShutdownSignalHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - /* Don't joggle the elbow of proc_exit */ - if (!proc_exit_inprogress) - { - InterruptPending = true; - ProcDiePending = true; - } - - SetLatch(MyLatch); - - errno = save_errno; - -} /* Main entry point for walreceiver process */ void @@ -263,7 +246,7 @@ WalReceiverMain(void) pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */ + pqsignal(SIGTERM, die); /* request shutdown */ /* SIGQUIT handler was already set up by InitPostmasterChild */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); -- 2.39.3