Hi, On 2022-12-30 14:14:55 +1300, Thomas Munro wrote: > Oh, I was imagining something slightly different. Not something under > src/include/libpq, but conceptually a separate header-only library > that is above both the backend and libpq. Maybe something like > src/include/febe_util/libpq_connect_interruptible.h. In other words, > I thought your idea b was a header-only version of your idea a. I > think that might be a bit nicer than putting it under libpq? > Superficial difference, perhaps...
It doesn't seem entirely right to introduce a top-level "module" for libpq-in-extensions to me - we don't do that for other APIs used for extensions. But a header only library also doesn't quite seem right. So ... Looking at turning my patch upthread into something slightly less prototype-y, I noticed that libpqwalreceiver doesn't do AcquireExternalFD(), added to other backend uses of libpq in 3d475515a15. It's unlikely to matter for walreceiver.c itelf, but it seems problematic for the logical replication cases? It's annoying to introduce PG_TRY/CATCH blocks, just to deal with the potential for errors inside WaitLatchOrSocket(), which should never happen. I wonder if we should consider making latch.c error out fatally, instead of elog(ERROR). If latches are broken, things are bad. The PG_CATCH() logic in postgres_fdw's GetConnection() looks quite suspicious to me. It looks like 32a9c0bdf493 took entirely the wrong path. Instead of optionally not throwing or directly re-establishing connections in begin_remote_xact(), the PG_CATCH() hammer was applied. The attached patch adds libpq-be-fe-helpers.h and uses it in libpqwalreceiver, dblink, postgres_fdw. As I made libpq-be-fe-helpers.h handle reserving external fds, libpqwalreceiver now does so. I briefly looked through its users without seeing cases of leaking in case of errors - which would already have been bad, since we'd already have leaked a libpq connection/socket. Given the lack of field complaints and the size of the required changes, I don't think we should backpatch this, even though it's pretty clearly buggy as-is. Some time back Thomas had a patch to introduce a wrapper around libpq-in-extensions that fixed issues cause by some events being edge-triggered on windows. It's possible that combining these two efforts would yield something better. I resisted the urge to create a wrapper around each connection in this patch, as it'd have ended up being a whole lot more invasive. But... Thomas, do you have a reference to that code handy? Greetings, Andres Freund
>From 34e5ea311322b3bc3bb0f8c925f9ade1a59c6f09 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Tue, 3 Jan 2023 11:31:04 -0800 Subject: [PATCH v2] wip: don't block inside PQconnectdb et al --- src/include/libpq/libpq-be-fe-helpers.h | 233 ++++++++++++++++++ .../libpqwalreceiver/libpqwalreceiver.c | 53 +--- contrib/dblink/dblink.c | 80 +----- contrib/postgres_fdw/connection.c | 42 +--- 4 files changed, 256 insertions(+), 152 deletions(-) create mode 100644 src/include/libpq/libpq-be-fe-helpers.h diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h new file mode 100644 index 00000000000..4f3d3b821f0 --- /dev/null +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -0,0 +1,233 @@ +/*------------------------------------------------------------------------- + * + * libpq-be-fe-helpers.h + * Helper functions for using libpq in extensions + * + * Code built directly into the backend is not allowed to link to libpq + * directly. Extension code is allowed to use libpq however. However, libpq + * used in extensions has to be careful to block inside libpq, otherwise + * interrupts will not be processed, leading to issues like unresolvable + * deadlocks. Backend code also needs to take care to acquire/release an + * external fd for the connection, otherwise fd.c's accounting of fd's is + * broken. + * + * This file provides helper functions to make it easier to comply with these + * rules. It is a header only library as it needs to be linked into each + * extension using libpq, and it seems too small to be worth adding a + * dedicated static library for. + * + * TODO: For historical reasons the connections established here are not put + * into non-blocking mode. That can lead to blocking even when only the async + * libpq functions are used. This should be fixed. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/libpq-be-fe-helpers.h + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQ_BE_FE_HELPERS_H +#define LIBPQ_BE_FE_HELPERS_H + +/* + * Despite the name, BUILDING_DLL is set only when building code directly part + * of the backend. Which also is where libpq isn't allowed to be + * used. Obviously this doesn't protect against libpq-fe.h getting included + * otherwise, but perhaps still protects against a few mistakes... + */ +#ifdef BUILDING_DLL +#error "libpq may not be used code directly built into the backend" +#endif + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/latch.h" + + +static inline void libpqsrv_connect_prepare(void); +static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); + +/* + * PQfinish() wrapper that additionally releases the reserved file descriptor. + * + * It is allowed to call this with a NULL pgconn iff returned by + * libpqsrv_connect*. + */ +static inline void +libpqsrv_disconnect(PGconn *conn) +{ + /* + * If no connection was established, we haven't reserved an FD for it (or + * already released it). This rule makes it easier to write PG_CATCH() + * handlers for this facility's users. + * + * See also libpqsrv_connect_internal(). + */ + if (conn == NULL) + return; + + ReleaseExternalFD(); + PQfinish(conn); +} + +/* + * PQconnectdb() wrapper that reserves a file descriptor and processes + * interrupts. + */ +static inline PGconn * +libpqsrv_connect(const char *conninfo, uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStart(conninfo); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * PQconnectdbParams() wrapper that reserves a file descriptor and processes + * interrupts. + */ +static inline PGconn * +libpqsrv_connect_params(const char *const *keywords, + const char *const *values, + int expand_dbname, + uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStartParams(keywords, values, expand_dbname); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_prepare(void) +{ + /* + * We must obey fd.c's limit on non-virtual file descriptors. Assume that + * a PGconn represents one long-lived FD. (Doing this here also ensures + * that VFDs are closed if needed to make room.) + */ + if (!AcquireExternalFD()) + { +#ifndef WIN32 /* can't write #if within ereport() macro */ + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); +#else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process setting."))); +#endif + } + +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) +{ + /* + * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do + * that here. + */ + if (conn == NULL) + { + ReleaseExternalFD(); + return; + } + + /* + * Can't wait without a socket. Note that we don't want to close the libpq + * connection yet, so callers can emit a useful error. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return; + + /* + * WaitLatchOrSocket() can conceivably fail, handle this case here instead + * of requiring all callers to do so. + */ + PG_TRY(); + { + PostgresPollingStatusType status; + + /* + * Poll connection until we have OK or FAILED status. + * + * Per spec for PQconnectPoll, first wait till socket is write-ready. + */ + status = PGRES_POLLING_WRITING; + while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED) + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + + /* + * Windows needs a different test while waiting for + * connection-made + */ + else if (PQstatus(conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn); + } + } + PG_CATCH(); + { + /* + * If an error is thrown here, the callers won't call + * libpqsrv_disconnect() with a conn, so release resources + * immediately. + */ + ReleaseExternalFD(); + PQfinish(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +#endif /* LIBPQ_BE_FE_HELPERS_H */ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 219cd73b7fc..bab2fe29562 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -24,6 +24,7 @@ #include "common/connect.h" #include "funcapi.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; - PostgresPollingStatusType status; const char *keys[6]; const char *vals[6]; int i = 0; @@ -172,52 +172,9 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, Assert(i < sizeof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); - if (PQstatus(conn->streamConn) == CONNECTION_BAD) - { - *err = pchomp(PQerrorMessage(conn->streamConn)); - return NULL; - } - - /* - * Poll connection until we have OK or FAILED status. - * - * Per spec for PQconnectPoll, first wait till socket is write-ready. - */ - status = PGRES_POLLING_WRITING; - do - { - int io_flag; - int rc; - - if (status == PGRES_POLLING_READING) - io_flag = WL_SOCKET_READABLE; -#ifdef WIN32 - /* Windows needs a different test while waiting for connection-made */ - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) - io_flag = WL_SOCKET_CONNECTED; -#endif - else - io_flag = WL_SOCKET_WRITEABLE; - - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) - status = PQconnectPoll(conn->streamConn); - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + conn->streamConn = libpqsrv_connect_params(keys, vals, + /* expand_dbname = */ true, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); if (PQstatus(conn->streamConn) != CONNECTION_OK) { @@ -740,7 +697,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) static void libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); PQfreemem(conn->recvBuf); pfree(conn); } diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 8dd122042b4..130320db571 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -48,6 +48,7 @@ #include "funcapi.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "parser/scansup.h" @@ -59,6 +60,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/varlena.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -199,37 +201,14 @@ dblink_get_conn(char *conname_or_str, connstr = conname_or_str; dblink_connstr_check(connstr); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume - * that a PGconn represents one long-lived FD. (Doing this here also - * ensures that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdb(connstr); + conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION); if (PQstatus(conn) == CONNECTION_BAD) { char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); ereport(ERROR, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), errmsg("could not establish connection"), @@ -312,36 +291,13 @@ dblink_connect(PG_FUNCTION_ARGS) /* check password in connection string if not superuser */ dblink_connstr_check(connstr); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume that - * a PGconn represents one long-lived FD. (Doing this here also ensures - * that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdb(connstr); + conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION); if (PQstatus(conn) == CONNECTION_BAD) { msg = pchomp(PQerrorMessage(conn)); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) pfree(rconn); @@ -366,10 +322,7 @@ dblink_connect(PG_FUNCTION_ARGS) else { if (pconn->conn) - { - PQfinish(pconn->conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); pconn->conn = conn; } @@ -402,8 +355,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) if (!conn) dblink_conn_not_avail(conname); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) { deleteConnection(conname); @@ -838,10 +790,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { /* if needed, close the connection to the database */ if (freeconn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); } PG_END_TRY(); @@ -1516,10 +1465,7 @@ dblink_exec(PG_FUNCTION_ARGS) { /* if needed, close the connection to the database */ if (freeconn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); } PG_END_TRY(); @@ -2606,8 +2552,7 @@ createNewConnection(const char *name, remoteConn *rconn) if (found) { - PQfinish(rconn->conn); - ReleaseExternalFD(); + libpqsrv_disconnect(rconn->conn); pfree(rconn); ereport(ERROR, @@ -2647,8 +2592,7 @@ dblink_security_check(PGconn *conn, remoteConn *rconn) { if (!PQconnectionUsedPassword(conn)) { - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) pfree(rconn); diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ed75ce3f79c..7760380f00d 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -17,6 +17,7 @@ #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" #include "funcapi.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -446,35 +447,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user) /* verify the set of connection parameters */ check_conn_params(keywords, values, user); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume - * that a PGconn represents one long-lived FD. (Doing this here also - * ensures that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdbParams(keywords, values, false); - - if (!conn) - ReleaseExternalFD(); /* because the PG_CATCH block won't */ + conn = libpqsrv_connect_params(keywords, values, + /* expand_dbname = */ false, + PG_WAIT_EXTENSION); if (!conn || PQstatus(conn) != CONNECTION_OK) ereport(ERROR, @@ -507,12 +483,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) } PG_CATCH(); { - /* Release PGconn data structure if we managed to create one */ - if (conn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); PG_RE_THROW(); } PG_END_TRY(); @@ -528,9 +499,8 @@ disconnect_pg_server(ConnCacheEntry *entry) { if (entry->conn != NULL) { - PQfinish(entry->conn); + libpqsrv_disconnect(entry->conn); entry->conn = NULL; - ReleaseExternalFD(); } } -- 2.38.0