Hi, http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php On win32, the blocking libpq functions like PQconnectdb() and PQexec() are uninterruptible since they use the vanilla select() instead of our signal emulation layer compatible select(). Nevertheless, currently walreceiver uses them to establish a connection, send a handshake message and wait for the reply. So walreceiver also becomes uninterruptible for a while. This is the must-fix problem for 9.0.
I replaced the blocking libpq functions currently used with asynchronous ones, and used the emulated version of select() to wait, to make walreceiver interruptible. Here is the patch. Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c --- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c *************** *** 18,23 **** --- 18,24 ---- #include <unistd.h> #include <sys/time.h> + #include <time.h> #include "libpq-fe.h" #include "access/xlog.h" *************** *** 53,59 **** static bool libpqrcv_receive(int timeout, unsigned char *type, static void libpqrcv_disconnect(void); /* Prototypes for private functions */ ! static bool libpq_select(int timeout_ms); /* * Module load callback --- 54,61 ---- static void libpqrcv_disconnect(void); /* Prototypes for private functions */ ! static int libpq_select(bool forRead, bool forWrite, int timeout_ms); ! static PGresult *libpqrcv_PQexec(const char *query); /* * Module load callback *************** *** 83,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) TimeLineID standby_tli; PGresult *res; char cmd[64]; /* Connect */ snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo); ! streamConn = PQconnectdb(conninfo_repl); ! if (PQstatus(streamConn) != CONNECTION_OK) ereport(ERROR, (errmsg("could not connect to the primary server : %s", PQerrorMessage(streamConn)))); /* * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ ! res = PQexec(streamConn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); --- 85,198 ---- TimeLineID standby_tli; PGresult *res; char cmd[64]; + PQconninfoOption *options; + time_t finish_time = ((time_t) -1); + + /* + * Extract timeout from the connection string + */ + options = PQconninfoParse(conninfo, NULL); + if (options) + { + PQconninfoOption *option; + for (option = options; option->keyword != NULL; option++) + { + if (strcmp(option->keyword, "connect_timeout") == 0) + { + if (option->val != NULL && option->val[0] != '\0') + { + int timeout = atoi(option->val); + + if (timeout > 0) + { + /* + * Rounding could cause connection to fail; + * need at least 2 secs + */ + if (timeout < 2) + timeout = 2; + /* calculate the finish time based on start + timeout */ + finish_time = time(NULL) + timeout; + } + } + } + } + PQconninfoFree(options); + } /* Connect */ snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo); ! streamConn = PQconnectStart(conninfo_repl); ! if (PQstatus(streamConn) == CONNECTION_BAD) ereport(ERROR, (errmsg("could not connect to the primary server : %s", PQerrorMessage(streamConn)))); /* + * Wait for connection to be established + */ + for (;;) + { + PostgresPollingStatusType status; + bool established = false; + bool forRead = false; + bool forWrite = false; + int timeout_ms; + int ret; + + status = PQconnectPoll(streamConn); + switch (status) + { + case PGRES_POLLING_READING: + forRead = true; + break; + case PGRES_POLLING_WRITING: + forWrite = true; + break; + case PGRES_POLLING_OK: + established = true; + break; + case PGRES_POLLING_FAILED: + default: + ereport(ERROR, + (errmsg("could not connect to the primary server : %s", + PQerrorMessage(streamConn)))); + } + + if (established) + break; + + retry: + /* Compute appropriate timeout interval */ + if (finish_time == ((time_t) -1)) + timeout_ms = -1; + else + { + time_t now = time(NULL); + + if (finish_time > now) + timeout_ms = (finish_time - now) * 1000; + else + timeout_ms = 0; + } + + /* + * Wait until we can read or write the connection socket + */ + ret = libpq_select(forRead, forWrite, timeout_ms); + if (ret == 0) /* timeout */ + ereport(ERROR, + (errmsg("could not connect to the primary server : timeout expired"))); + if (ret < 0) /* interrupted */ + goto retry; + } + + /* * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ ! res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); *************** *** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); ! res = PQexec(streamConn, cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) ereport(ERROR, (errmsg("could not start XLOG streaming: %s", PQerrorMessage(streamConn)))); PQclear(res); justconnected = true; --- 244,257 ---- /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); ! res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); ereport(ERROR, (errmsg("could not start XLOG streaming: %s", PQerrorMessage(streamConn)))); + } PQclear(res); justconnected = true; *************** *** 162,176 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) } /* ! * Wait until we can read WAL stream, or timeout. * ! * Returns true if data has become available for reading, false if timed out ! * or interrupted by signal. * * This is based on pqSocketCheck. */ ! static bool ! libpq_select(int timeout_ms) { int ret; --- 260,275 ---- } /* ! * Wait until we can read or write the connection socket * ! * Returns >0 if data has been ready to be read, written, ! * or both, 0 if timed out, -1 if interrupted by signal. ! * Throws an error if an error occurred. * * This is based on pqSocketCheck. */ ! static int ! libpq_select(bool forRead, bool forWrite, int timeout_ms) { int ret; *************** *** 180,203 **** libpq_select(int timeout_ms) (errcode_for_socket_access(), errmsg("socket not open"))); /* We use poll(2) if available, otherwise select(2) */ { #ifdef HAVE_POLL struct pollfd input_fd; input_fd.fd = PQsocket(streamConn); ! input_fd.events = POLLIN | POLLERR; input_fd.revents = 0; ret = poll(&input_fd, 1, timeout_ms); #else /* !HAVE_POLL */ fd_set input_mask; struct timeval timeout; struct timeval *ptr_timeout; FD_ZERO(&input_mask); ! FD_SET (PQsocket(streamConn), &input_mask); if (timeout_ms < 0) ptr_timeout = NULL; --- 279,314 ---- (errcode_for_socket_access(), errmsg("socket not open"))); + Assert(forRead || forWrite); + /* We use poll(2) if available, otherwise select(2) */ { #ifdef HAVE_POLL struct pollfd input_fd; input_fd.fd = PQsocket(streamConn); ! input_fd.events = POLLERR; input_fd.revents = 0; + if (forRead) + input_fd.events |= POLLIN; + if (forWrite) + input_fd.events |= POLLOUT; + ret = poll(&input_fd, 1, timeout_ms); #else /* !HAVE_POLL */ fd_set input_mask; + fd_set output_mask; struct timeval timeout; struct timeval *ptr_timeout; FD_ZERO(&input_mask); ! FD_ZERO(&output_mask); ! if (forRead) ! FD_SET (PQsocket(streamConn), &input_mask); ! if (forWrite) ! FD_SET (PQsocket(streamConn), &output_mask); if (timeout_ms < 0) ptr_timeout = NULL; *************** *** 209,225 **** libpq_select(int timeout_ms) } ret = select(PQsocket(streamConn) + 1, &input_mask, ! NULL, NULL, ptr_timeout); #endif /* HAVE_POLL */ } ! if (ret == 0 || (ret < 0 && errno == EINTR)) ! return false; if (ret < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed: %m"))); ! return true; } /* --- 320,397 ---- } ret = select(PQsocket(streamConn) + 1, &input_mask, ! &output_mask, NULL, ptr_timeout); #endif /* HAVE_POLL */ } ! if (ret == 0) /* timeout */ ! return 0; ! if (ret < 0 && errno == EINTR) /* interrupted */ ! return -1; if (ret < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed: %m"))); ! return ret; ! } ! ! /* ! * Send a query and wait for the result by using the asynchronous libpq ! * functions and the backend version of select(). ! * ! * On Windows, walreceiver must use this function instead of the blocking ! * libpq functions like PQexec() since they use the vanilla select() and ! * are uninterruptible by our emulated signals. On the other hand, this ! * function is interruptible since it uses the signal emulation layer ! * compatible select(). ! */ ! static PGresult * ! libpqrcv_PQexec(const char *query) ! { ! PGresult *res = NULL; ! ! /* ! * Submit a query. Since we don't use non-blocking mode, this also ! * can block. But its risk is relatively small, so we ignore that ! * for now. ! */ ! if (!PQsendQuery(streamConn, query)) ! return NULL; ! ! for (;;) ! { ! PGresult *next; ! ! /* ! * Receive data until PQgetResult has been ready to get the ! * result without blocking. ! */ ! while (PQisBusy(streamConn)) ! { ! if (libpq_select(true, false, -1) < 0) ! continue; /* interrupted */ ! if (PQconsumeInput(streamConn) == 0) ! return NULL; /* trouble */ ! } ! ! /* ! * Don't emulate the PQexec()'s behavior of returning the last ! * result, if there's many, since walreceiver never sends a query ! * returning multiple results. ! */ ! if ((next = PQgetResult(streamConn)) == NULL) ! break; /* query is complete */ ! if (PQresultStatus(next) == PGRES_FATAL_ERROR) ! return next; ! PQclear(res); ! res = next; ! if (PQresultStatus(res) == PGRES_COPY_IN || ! PQresultStatus(res) == PGRES_COPY_OUT || ! PQstatus(streamConn) == CONNECTION_BAD) ! break; ! } ! ! return res; } /* *************** *** 268,274 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) */ if (timeout > 0 && !justconnected) { ! if (!libpq_select(timeout)) return false; if (PQconsumeInput(streamConn) == 0) --- 440,446 ---- */ if (timeout > 0 && !justconnected) { ! if (libpq_select(true, false, timeout) <= 0) return false; if (PQconsumeInput(streamConn) == 0)
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers