Hi, I just re-discovered this issue, via https://www.postgresql.org/message-id/20221209003607.bz2zdznvfnkq4zz3%40awork3.anarazel.de
On 2022-09-25 16:22:37 -0700, Andres Freund wrote: > Maybe I am missing something, but I don't think it's OK for > connect_pg_server() to connect in a blocking manner, without accepting > interrupts? It's definitely not. This basically means network issues or such can lead to connections being unkillable... We know how to do better, c.f. libpqrcv_connect(). I hacked that up for postgres_fdw, and got through quite a few runs without related issues ([1]). The same problem is present in two places in dblink.c. Obviously we can copy and paste the code to dblink.c as well. But that means we have the same not quite trivial code in three different c files. There's already a fair bit of duplicated code around AcquireExternalFD(). It seems we should find a place to put backend specific libpq wrapper code somewhere. Unless we want to relax the rule about not linking libpq into the backend we can't just put it in the backend directly, though. The only alternative way to provide a wrapper that I can think of are to a) introduce a new static library that can be linked to by libpqwalreceiver, postgres_fdw, dblink b) add a header with static inline functions implementing interrupt-processing connection establishment for libpq Neither really has precedent. The attached quick patch just adds and uses libpq_connect_interruptible() in postgres_fdw. If we wanted to move this somewhere generic, at least part of the external FD handling should also be moved into it. dblink.c uses a lot of other blocking libpq functions, which obviously also isn't ok. Perhaps we could somehow make this easier from within libpq? My first thought was a connection parameter that'd provide a different implementation of pqSocketCheck() or such - but I don't think that'd work, because we might throw errors when processing interrupts, and that'd not be ok from deep within libpq. Greetings, Andres Freund [1] I eventually encountered a deadlock in REINDEX, but it didn't involve postgres_fw / ProcSignalBarrier
diff --git i/contrib/postgres_fdw/connection.c w/contrib/postgres_fdw/connection.c index f0c45b00db8..f9da564a539 100644 --- i/contrib/postgres_fdw/connection.c +++ w/contrib/postgres_fdw/connection.c @@ -347,6 +347,67 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->conn, server->servername, user->umid, user->userid); } +/* + * PQconnectStartParams() wrapper that processes interrupts. Backend code + * should *never* enter blocking libpq code as that would prevent + * cancellations, global barriers etc from being processed. + */ +static PGconn * +libpq_connect_interruptible(const char *const *keywords, + const char *const *values, + int expand_dbname, + uint32 wait_event_info) +{ + PGconn *conn; + PostgresPollingStatusType status; + + conn = PQconnectStartParams(keywords, values, false); + + if (!conn) + return NULL; + + /* + * Poll connection until we have OK or FAILED status. + * + * Per spec for PQconnectPoll, first wait till socket is write-ready. + */ + status = PGRES_POLLING_WRITING; + while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED) + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + /* Windows needs a different test while waiting for connection-made */ + else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn); + } + + return conn; +} + /* * Connect to remote server using specified server and user mapping properties. */ @@ -471,7 +532,8 @@ connect_pg_server(ForeignServer *server, UserMapping *user) } /* OK to make connection */ - conn = PQconnectdbParams(keywords, values, false); + conn = libpq_connect_interruptible(keywords, values, + false, PG_WAIT_EXTENSION); if (!conn) ReleaseExternalFD(); /* because the PG_CATCH block won't */