On Thu, 29 Sep 2016 23:45:52 +0530 Mithun Cy <mithun...@enterprisedb.com> wrote:
> This patch do not apply on latest code. it fails as follows It's strange. I have no problems applying it to commit fd321a1dfd64d30 Ok, some trailing whitespace and mixing of tabs and spaces which git apply doesn't like really present in the patch. I'm attached hear version with these issues resolved. > I am slightly confused. As per this target_server_type=master means > user is expecting failover. What if user pass target_server_type=any > and request sequential connection isn't this also a case for > failover?. I think by default it should be "any" for any number of > hosts. And parameter "sequential and random will decide whether we > want just failover or with load-balance. I don't agree with this. In the first versions of the patch it refuses connect to readonly server even if it is only one, because I think that read-write connection is what user typically expect. When user tries to connect to cluster (specifying many hosts in the connect string), it can be by default assumed that he wants master. But backward compatibility is more important thing, so I now assume, that user tries to connect just one node, and this node is read only, user knows what he is doing.
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 4e34f00..f0df877 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -792,7 +792,7 @@ host=localhost port=5432 dbname=mydb connect_timeout=10 <para> The general form for a connection <acronym>URI</acronym> is: <synopsis> -postgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] +postgresql://[user[:password]@][netloc][:port][,netloc[:port]...][/dbname][?param1=value1&...] </synopsis> </para> @@ -809,6 +809,7 @@ postgresql://localhost/mydb postgresql://user@localhost postgresql://user:secret@localhost postgresql://other@localhost/otherdb?connect_timeout=10&application_name=myapp +postgresql://node1,node2:5433,node3:4432,node4/mydb?hostorder=random&target_server_type=any </programlisting> Components of the hierarchical part of the <acronym>URI</acronym> can also be given as parameters. For example: @@ -831,7 +832,9 @@ postgresql:///mydb?host=localhost&port=5433 <para> For improved compatibility with JDBC connection <acronym>URI</acronym>s, instances of parameter <literal>ssl=true</literal> are translated into - <literal>sslmode=require</literal>. + <literal>sslmode=require</literal> and + <literal>loadBalanceHosts=true</literal> into + <literal>hostorder=random</literal>. </para> <para> @@ -841,6 +844,10 @@ postgresql:///mydb?host=localhost&port=5433 postgresql://[2001:db8::1234]/database </synopsis> </para> + <para> + There can be serveral host specifications, optionally accompanied + with port, separated by comma. + </para> <para> The host component is interpreted as described for the parameter <xref @@ -881,6 +888,20 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname when <productname>PostgreSQL</> was built). On machines without Unix-domain sockets, the default is to connect to <literal>localhost</>. </para> + <para> + There can be more than one <literal>host</literal> parameter in + the connect string. In this case these hosts would be considered + alternate entries into same database and if connect to first one + fails, library would try to connect second etc. This can be used + for high availability cluster or for load balancing. See + <xref linkend="libpq-connect-hostorder"> parameter. + </para> + <para> + Network host name can be accompanied with port number, separated by + colon. If so, this port number is used only when connected to + this host. If there is no port number, port specified in the + <xref linkend="libpq-connect-port"> parameter would be used. + </para> </listitem> </varlistentry> @@ -942,8 +963,44 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname </para> </listitem> </varlistentry> - + + <varlistentry id="libpq-connect-hostorder" xreflabel="hostorder"> + <term><literal>hostorder</literal></term> + <listitem> + <para> + Specifies how to choose host from list of alternate hosts, + specified in the <xref linkend="libpq-connect-host"> parameter. + </para> + <para> + If value of this argument is <literal>sequential</literal> (the + default) library connects to the hosts in order they specified, + and tries to connect second one only if connection to the first + fails. + </para> + <para> + If value is <literal>random</literal> host to connect is randomly + picked from the list. It allows to balance load between several + cluster nodes. However, currently PostgreSQL doesn't support + multimaster clusters. So, without use of third-party products, + only read-only connections can take advantage from the + load-balancing. See <xref linkend="libpq-connect-target-server-type"> + </para> + </listitem> + </varlistentry> + <varlistentry id="libpq-connect-target-server-type" xreflabel="target_server_type"> + <term><literal>target_server_type</literal></term> + <listitem> + <para> + If this parameter is <literal>master</literal> upon successful connection + library checks if host is in recovery state, and if it is so, + tries next host in the connect string. If this parameter is + <literaL>any</literal>, connection to standby nodes are considered + successful. + </para> + </listitem> + </varlistentry> <varlistentry id="libpq-connect-port" xreflabel="port"> + <term><literal>port</literal></term> <listitem> <para> @@ -985,7 +1042,6 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname </para> </listitem> </varlistentry> - <varlistentry id="libpq-connect-connect-timeout" xreflabel="connect_timeout"> <term><literal>connect_timeout</literal></term> <listitem> @@ -996,7 +1052,27 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname </para> </listitem> </varlistentry> - + <varlistentry id="libpq-connect-falover-timeout" xreflabel="failover_timeout"> + <term><literal>failover_timeout</literal></term> + <listitem> + <para> + Maximum time to cyclically retry all the hosts in connect string. + (as decimal integer number of seconds). If not specified, then + hosts are tried just once. + </para> + <para> + If we have replicating cluster, and master node fails, it might + take some time to promote one of standby nodes to the new master. + So clients which notice that connect to the master fails, can + already give up attempt to reestablish a connection when new master + became available. + </para> + <para> + Setting this parameter to reasonable time makes library try to + reconnect all the host in cyclically until new master appears. + </para> + </listitem> + </varlistentry> <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding"> <term><literal>client_encoding</literal></term> <listitem> @@ -7227,6 +7303,18 @@ user=admin An example file is provided at <filename>share/pg_service.conf.sample</filename>. </para> + <para> + If more than one <literal>host</literal> option present in the section of service file, it + is interpeted as alternate servers for failover or load-balancing. See + <xref linkend="libpq-connect-host"> option in the connect string. + </para> + <para> + For all other options first value takes precedence over later ones. + </para> + <para> + Options, specified in the connect string along with service option + have precedence over values from the service file. + </para> </sect1> diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index b1789eb..815a827 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -36,7 +36,7 @@ OBJS= fe-auth.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \ libpq-events.o # libpgport C files we always use OBJS += chklocale.o inet_net_ntop.o noblock.o pgstrcasecmp.o pqsignal.o \ - thread.o + thread.o pgsleep.o # libpgport C files that are needed if identified by configure OBJS += $(filter crypt.o getaddrinfo.o getpeereid.o inet_aton.o open.o system.o snprintf.o strerror.o strlcpy.o win32error.o win32setlocale.o, $(LIBOBJS)) # src/backend/utils/mb @@ -129,6 +129,9 @@ install: all installdirs install-lib installcheck: $(MAKE) -C test $@ +check: + $(prove_check) + installdirs: installdirs-lib $(MKDIR_P) '$(DESTDIR)$(includedir)' '$(DESTDIR)$(includedir_internal)' '$(DESTDIR)$(datadir)' @@ -142,6 +145,7 @@ uninstall: uninstall-lib clean distclean: clean-lib $(MAKE) -C test $@ rm -f $(OBJS) pthread.h libpq.rc + rm -rf tmp_check # Might be left over from a Win32 client-only build rm -f pg_config_paths.h rm -f inet_net_ntop.c noblock.c pgstrcasecmp.c pqsignal.c thread.c diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index f3a9e5a..d9c18b7 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -299,7 +299,16 @@ static const internalPQconninfoOption PQconninfoOptions[] = { {"replication", NULL, NULL, NULL, "Replication", "D", 5, offsetof(struct pg_conn, replication)}, - + /* Parameters added by failover patch */ + {"hostorder", NULL, "sequential", NULL, + "Host order", "", 10, + offsetof(struct pg_conn, hostorder)}, + {"target_server_type", NULL, NULL, NULL, + "Target server type", "", 6, + offsetof(struct pg_conn, target_server_type)}, + {"failover_timeout", NULL, NULL, NULL, + "Failover Timeout", "", 10, + offsetof(struct pg_conn, failover_timeout)}, /* Terminating entry --- MUST BE LAST */ {NULL, NULL, NULL, NULL, NULL, NULL, 0} @@ -336,6 +345,7 @@ static PGconn *makeEmptyPGconn(void); static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions); static void freePGconn(PGconn *conn); static void closePGconn(PGconn *conn); +static void pqTerminateConn(PGconn *conn); static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage); static PQconninfoOption *parse_connection_string(const char *conninfo, PQExpBuffer errorMessage, bool use_defaults); @@ -380,6 +390,7 @@ static bool getPgPassFilename(char *pgpassfile); static void dot_pg_pass_warning(PGconn *conn); static void default_threadlock(int acquire); +static int try_next_address(PGconn *conn); /* global variable because fe-auth.c needs to access it */ pgthreadlock_t pg_g_threadlock = default_threadlock; @@ -806,7 +817,9 @@ connectOptions2(PGconn *conn) { if (conn->pgpass) free(conn->pgpass); - conn->pgpass = PasswordFromFile(conn->pghost, conn->pgport, + conn->pgpass = PasswordFromFile( + conn->actualhost ? conn->actualhost : conn->pghost, + conn->actualport ? conn->actualport : conn->pgport, conn->dbName, conn->pguser); if (conn->pgpass == NULL) { @@ -888,7 +901,21 @@ connectOptions2(PGconn *conn) if (!conn->client_encoding_initial) goto oom_error; } - + /* + * Validate target_server_mode option + */ + if (conn->target_server_type) + { + if (strcmp(conn->target_server_type, "any") != 0 + && strcmp(conn->target_server_type, "master") != 0) + { + conn->status = CONNECTION_BAD; + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("invalid target_server_type value: \"%s\" should be \"any\" or \"master\"\n"), + conn->target_server_type); + return false; + } + } /* * Only if we get this far is it appropriate to try to connect. (We need a * state flag, rather than just the boolean result of this function, in @@ -1173,6 +1200,8 @@ connectFailureMessage(PGconn *conn, int errorno) if (conn->pghostaddr && conn->pghostaddr[0] != '\0') displayed_host = conn->pghostaddr; + else if (conn->actualhost && conn->actualhost[0] != '\0') + displayed_host = conn->actualhost; else if (conn->pghost && conn->pghost[0] != '\0') displayed_host = conn->pghost; else @@ -1390,11 +1419,17 @@ setKeepalivesWin32(PGconn *conn) static int connectDBStart(PGconn *conn) { + struct nodeinfo + { + char *host; + char *port; + }; int portnum; char portstr[MAXPGPATH]; struct addrinfo *addrs = NULL; struct addrinfo hint; - const char *node; + struct nodeinfo *nodes, + *node; int ret; if (!conn) @@ -1436,21 +1471,162 @@ connectDBStart(PGconn *conn) if (conn->pghostaddr != NULL && conn->pghostaddr[0] != '\0') { /* Using pghostaddr avoids a hostname lookup */ - node = conn->pghostaddr; + + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + nodes->host = strdup(conn->pghostaddr); + if (nodes->host == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } hint.ai_family = AF_UNSPEC; hint.ai_flags = AI_NUMERICHOST; } else if (conn->pghost != NULL && conn->pghost[0] != '\0') { /* Using pghost, so we have to look-up the hostname */ - node = conn->pghost; + char *p = conn->pghost, + *q, + *r; + int nodecount = 0, + nodesallocated = 4; + + /* + * Parse comma-separated list of host-port pairs into function-local + * array of records + */ + nodes = malloc(sizeof(struct nodeinfo) * 4); + if (nodes == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + while (*p) + { + q = p; + r = NULL; + /* Scan for the comma or end of string */ + while (*q != ',' && *q != 0) + { + if (*q == ':') + r = q; + if (*q == ']') + r = NULL; /* if there is IPv6, colons before close + * bracket are part of address */ + q++; + } + if (r) + { + + /* Host has explicitely specified port */ + char *nptr; + /* Check if port is numeric */ + for (nptr=r+1;nptr<q;nptr++) { + if (*nptr<'0' ||*nptr >'9') { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Port is not numeric")); + conn->options_valid = false; + goto connect_errReturn; + } + } + /* Allocate memory for port string */ + nodes[nodecount].port = malloc(q - r); + if (nodes[nodecount].port == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + strncpy(nodes[nodecount].port, r + 1, q - r); + nodes[nodecount].port[q - r - 1] = 0; + + } + else + { + r = q; + nodes[nodecount].port = NULL; + } + if ((*p) == '[' && *(r - 1) == ']') + { + /* IPv6 address found. Strip brackets */ + p++; + r--; + } + /* Fill node record */ + nodes[nodecount].host = malloc(r - p + 1); + if (nodes[nodecount].host == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + strncpy(nodes[nodecount].host, p, r - p); + nodes[nodecount].host[r - p] = 0; + /* skip a comma */ + if (*q) + q++; + nodecount++; + if (nodecount == nodesallocated) + nodes = realloc(nodes, sizeof(struct nodeinfo) * (nodesallocated += 4)); + if (nodes == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + p = q; + } + /* Fill end-of-host list marker */ + nodes[nodecount].host = NULL; + nodes[nodecount].port = NULL; hint.ai_family = AF_UNSPEC; + if (nodecount > 1 && conn->target_server_type == NULL) + { + /* + * if there is more than one host in the connect string and + * target_server_type is not specified explicitely, set + * target_server_type to "master", because default mode of + * operation is failover, and so, we need to connect to RW + * host, and keep other nodes of the cluster in the connect + * string just in case master would fail and one of standbys + * would be promoted to master. + * + * If we want to loadbalance readonly queries, set + * target_server_type = "any" explicitely + * + * But global default is "any" because if there is only one + * host in the connect string, we want backward-compatible + * behavoir. + */ + conn->target_server_type = strdup("master"); + if (conn->target_server_type == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + } } else { #ifdef HAVE_UNIX_SOCKETS /* pghostaddr and pghost are NULL, so use Unix domain socket */ - node = NULL; + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } hint.ai_family = AF_UNIX; UNIXSOCK_PATH(portstr, portnum, conn->pgunixsocket); if (strlen(portstr) >= UNIXSOCK_PATH_BUFLEN) @@ -1460,33 +1636,104 @@ connectDBStart(PGconn *conn) portstr, (int) (UNIXSOCK_PATH_BUFLEN - 1)); conn->options_valid = false; + free(nodes); + goto connect_errReturn; + } + nodes->port = strdup(portstr); + if (nodes->port == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; goto connect_errReturn; } #else /* Without Unix sockets, default to localhost instead */ - node = DefaultHost; + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } hint.ai_family = AF_UNSPEC; + nodes->host = strdup(DefaultHost); + if (nodes->host == NULL) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } #endif /* HAVE_UNIX_SOCKETS */ } /* Use pg_getaddrinfo_all() to resolve the address */ - ret = pg_getaddrinfo_all(node, portstr, &hint, &addrs); - if (ret || !addrs) + /* loop over all the host specs in the node variable */ + for (node = nodes; node->host != NULL || node->port != NULL; node++) { - if (node) - appendPQExpBuffer(&conn->errorMessage, - libpq_gettext("could not translate host name \"%s\" to address: %s\n"), - node, gai_strerror(ret)); + struct addrinfo *this_node_addrs; + + /* Resolve each hostname into list of addrinfo structures */ + ret = pg_getaddrinfo_all(node->host, (node->port ? node->port : portstr), + &hint, &this_node_addrs); + if (ret || !this_node_addrs) + { + if (node->host) + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("could not translate host name \"%s\" to address: %s\n"), + node->host, gai_strerror(ret)); + else + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("could not translate Unix-domain socket path \"%s\" to address: %s\n"), + node->port, gai_strerror(ret)); + if (this_node_addrs) + pg_freeaddrinfo_all(hint.ai_family, this_node_addrs); + + + /* + * We shouldn't fail here unless there is no valid addrinfos left + */ + continue; + } + if (node->host) + { + struct addrinfo *n; + + for (n = this_node_addrs; n != NULL; n = n->ai_next) + { + n->ai_canonname = strdup(node->host); + } + } + /* add this host addrs to addrs field of PGconn structure */ + if (!addrs) + { + addrs = this_node_addrs; + } else - appendPQExpBuffer(&conn->errorMessage, - libpq_gettext("could not translate Unix-domain socket path \"%s\" to address: %s\n"), - portstr, gai_strerror(ret)); - if (addrs) - pg_freeaddrinfo_all(hint.ai_family, addrs); + { + struct addrinfo *p; + + /* This loop finds pointer to the last element of the list */ + for (p = addrs; p->ai_next != NULL; p = p->ai_next) + { + } + p->ai_next = this_node_addrs; + } + } + /* Free nodes array */ + for (node = nodes; node->host != NULL || node->port != NULL; node++) + { + if (node->host) + free(node->host); + if (node->port) + free(node->port); + } + free(nodes); + /* Check if we've found at least one usable address */ + if (!addrs) + { conn->options_valid = false; goto connect_errReturn; } - #ifdef USE_SSL /* setup values based on SSL mode */ if (conn->sslmode[0] == 'd') /* "disable" */ @@ -1499,11 +1746,27 @@ connectDBStart(PGconn *conn) * Set up to try to connect, with protocol 3.0 as the first attempt. */ conn->addrlist = addrs; - conn->addr_cur = addrs; + + /* + * We cannot just assign first addrs record to addr_cur, because host + * order may be random. So, use try_next_address + */ + conn->addr_cur = NULL; + try_next_address(conn); conn->addrlist_family = hint.ai_family; conn->pversion = PG_PROTOCOL(3, 0); conn->send_appname = true; conn->status = CONNECTION_NEEDED; + if (conn->failover_timeout) + { + conn->failover_finish_time = time(NULL) + + atoi(conn->failover_timeout); + } + else + { + conn->failover_finish_time = (time_t) 0; /* it is in past, so its + * ok */ + } /* * The code for processing CONNECTION_NEEDED state is in PQconnectPoll(), @@ -1521,6 +1784,22 @@ connect_errReturn: return 0; } +/* + * This function is used to convert integer port number from the + * addrinfo structure back into string representation, because function + * PQport needs to return string representation of port. + */ + +static char * +get_port_from_addrinfo(struct addrinfo * ai) +{ + char port[6]; + + sprintf(port, "%d", htons(((struct sockaddr_in *) ai->ai_addr)->sin_port)); + /* allocation failure must be cheked by caller */ + return strdup(port); +} + /* * connectDBComplete @@ -1603,6 +1882,117 @@ connectDBComplete(PGconn *conn) } } +/* + * Gets address of pointer to the list of addrinfo sturctures. + * If order is random, rearranges the list by moving random element to + * the beginning (and putting its addres into given pointer. + * Returns address of first list element + */ +static struct addrinfo * +get_next_element(struct addrinfo ** list, char + *order) +{ + struct addrinfo *choice = NULL, + *prev, + *current, + *prechoice = NULL; + int count = 0; + + if (*list == NULL) + return NULL; + if (strcmp(order, "random") == 0) + { + /* Peek random element from the list. */ + for (current = *list, prev = NULL; current != NULL; + prev = current, current = current->ai_next) + { + count++; + if ((rand() & 0xffff) < 0x10000 / count) + { + choice = current; + prechoice = prev; + } + } + + /* + * If prechoice is not NULL, selected element is not first in the + * list. We have to move it to he head + */ + if (prechoice != NULL) + { + prechoice->ai_next = choice->ai_next; + choice->ai_next = *list; + *list = choice; + } + } + /* We always return first element of the list */ + return *list; +} + +/* ------------- + * try_next_address + * Attempts to set next address from the list of known ones. + * Returns 1 if address is choosen and 0 if there are no more addresses + * to try + * Takes into account hostorder parameter + * ------------ + */ + +static int +try_next_address(PGconn *conn) +{ + if (strcmp(conn->hostorder, "random") == 0) + { + /* + * Initialize random number generator in case if nobody have done it + * before. Use value from rand along with time in case random number + * have been initialized by application. Use address of conn structure + * to load-balance different connections in the same app + */ + srand((unsigned int) ((long int) conn ^ (long int) time(NULL) ^ + (long int) rand())); + } + if (conn->addr_cur == NULL) + { + + conn->addr_cur = get_next_element(&(conn->addrlist), + conn->hostorder); + conn->actualhost = conn->addr_cur->ai_canonname; + + return 1; + } + else + { + conn->addr_cur = get_next_element(&(conn->addr_cur->ai_next), + conn->hostorder); + } + if (conn->addr_cur == NULL && time(NULL) < conn->failover_finish_time) + { + /* + * If failover timeout is set, retry list of hosts from the beginning + */ + pg_usleep(1000000); + conn->addr_cur = get_next_element(&(conn->addrlist), + conn->hostorder); + } + + if (conn->addr_cur != NULL) + { + /* + * Clean up error message buffer. + */ + resetPQExpBuffer(&conn->errorMessage); + conn->actualhost = conn->addr_cur->ai_canonname; + + return 1; + } + else + { + return 0; + } + +} + /* ---------------- * PQconnectPoll * @@ -1680,7 +2070,8 @@ PQconnectPoll(PGconn *conn) case CONNECTION_SSL_STARTUP: case CONNECTION_NEEDED: break; - + case CONNECTION_CHECK_RW: + break; default: appendPQExpBufferStr(&conn->errorMessage, libpq_gettext( @@ -1718,9 +2109,8 @@ keep_going: /* We will come back to here until there is * ignore socket() failure if we have more addresses * to try */ - if (addr_cur->ai_next != NULL) + if (try_next_address(conn)) { - conn->addr_cur = addr_cur->ai_next; continue; } appendPQExpBuffer(&conn->errorMessage, @@ -1739,7 +2129,7 @@ keep_going: /* We will come back to here until there is if (!connectNoDelay(conn)) { pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } } @@ -1749,7 +2139,7 @@ keep_going: /* We will come back to here until there is libpq_gettext("could not set socket to nonblocking mode: %s\n"), SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } @@ -1760,7 +2150,7 @@ keep_going: /* We will come back to here until there is libpq_gettext("could not set socket to close-on-exec mode: %s\n"), SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } #endif /* F_SETFD */ @@ -1807,7 +2197,7 @@ keep_going: /* We will come back to here until there is if (err) { pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } } @@ -1883,6 +2273,10 @@ keep_going: /* We will come back to here until there is * go do the next stuff. */ conn->status = CONNECTION_STARTED; + + /* + * Save the name of the current host + */ goto keep_going; } @@ -1898,7 +2292,7 @@ keep_going: /* We will come back to here until there is /* * Try the next address, if any. */ - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); } /* loop over addresses */ /* @@ -1944,9 +2338,8 @@ keep_going: /* We will come back to here until there is * If more addresses remain, keep trying, just as in the * case where connect() returned failure immediately. */ - if (conn->addr_cur->ai_next != NULL) + if (try_next_address(conn)) { - conn->addr_cur = conn->addr_cur->ai_next; conn->status = CONNECTION_NEEDED; goto keep_going; } @@ -2596,13 +2989,21 @@ keep_going: /* We will come back to here until there is conn->errorMessage.data[conn->errorMessage.len - 1] != '\n') appendPQExpBufferChar(&conn->errorMessage, '\n'); PQclear(res); + + /* + * If we have more than one host in the connect string, + * fatal message from one of them is not really fatal + */ + if (try_next_address(conn)) + { + /* Must drop the old connection */ + pqDropConnection(conn, true); + conn->status = CONNECTION_NEEDED; + goto keep_going; + } goto error_return; } - /* We can release the address list now. */ - pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); - conn->addrlist = NULL; - conn->addr_cur = NULL; /* Fire up post-connection housekeeping if needed */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -2614,8 +3015,9 @@ keep_going: /* We will come back to here until there is } /* Otherwise, we are open for business! */ - conn->status = CONNECTION_OK; - return PGRES_POLLING_OK; + conn->status = CONNECTION_CHECK_RO; + goto keep_going; + } case CONNECTION_SETENV: @@ -2645,9 +3047,133 @@ keep_going: /* We will come back to here until there is goto error_return; } - /* We are open for business! */ + /* + * check if connection is readonly if we need readwrite one + */ + conn->status = CONNECTION_CHECK_RO; + goto keep_going; + + case CONNECTION_CHECK_RO: + + /* + * Connection to readonly host is allowed if + * taget_server_type is set to 'any' or is not exlicitely + */ + if (conn->pghost == NULL || !conn->target_server_type || + strcmp(conn->target_server_type, "any") == 0) + + { + /* + * We can release the address list now. but first make a copy + * of name of host we are connected to or it would be freed + * with list + */ + if (conn->actualhost) + { + conn->actualhost = strdup(conn->actualhost); + conn->actualport = get_port_from_addrinfo(conn->addr_cur); + if (!conn->actualhost || !conn->actualport) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory")); + goto error_return; + } + } + pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); + conn->addrlist = NULL; + conn->addr_cur = NULL; + + conn->status = CONNECTION_OK; + return PGRES_POLLING_OK; + } + /* Otherwise request result pg_is_in_recovery() */ + /* pretend that status is OK for time of sending query */ conn->status = CONNECTION_OK; - return PGRES_POLLING_OK; + PQsendQuery(conn, "SELECT pg_catalog.pg_is_in_recovery()"); + conn->status = CONNECTION_CHECK_RW; + return PGRES_POLLING_READING; + case CONNECTION_CHECK_RW: + { + char *value; + PGresult *res; + + conn->status = CONNECTION_OK; + if (!PQconsumeInput(conn)) + { + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } + if (PQisBusy(conn)) + { + /* Result is not ready yet */ + conn->status = CONNECTION_CHECK_RW; + return PGRES_POLLING_READING; + } + res = PQgetResult(conn); + + /* + * Call PQgetResult second time to clear connection state. + * Should return NULL, so result is ignored + */ + PQgetResult(conn); + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK || + PQntuples(res) != 1) + { + /* + * Something wrong happened with this host. skip to next + * one + */ + conn->status = CONNECTION_NEEDED; + } + else + { + value = PQgetvalue(res, 0, 0); + if (value[0] == 't') + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot make RW connection to hot " + "standby node %s"), conn->actualhost); + conn->status = CONNECTION_NEEDED; + } + } + if (res) + PQclear(res); + if (conn->status != CONNECTION_OK) + { + ConnStatusType save_status = conn->status; + + conn->status = CONNECTION_OK; + pqTerminateConn(conn); + pqDropConnection(conn, true); + conn->sock = PGINVALID_SOCKET; + if (try_next_address(conn)) + { + conn->status = save_status; + goto keep_going; + } + else + { + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } + + } + /* We can release the address list now. */ + if (conn->actualhost) + { + conn->actualhost = strdup(conn->actualhost); + conn->actualport = get_port_from_addrinfo(conn->addr_cur); + if (!conn->actualhost || !conn->actualport) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory")); + goto error_return; + } + + } + pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); + conn->addrlist = NULL; + conn->addr_cur = NULL; + return PGRES_POLLING_OK; + } default: appendPQExpBuffer(&conn->errorMessage, @@ -2938,19 +3464,17 @@ freePGconn(PGconn *conn) } /* - * closePGconn - * - properly close a connection to the backend + * pqTerminateConn + * + * - send terminate message to the backend, but do not free any + * transient state of PGconn object, which can be needed to reconnect * - * This should reset or release all transient state, but NOT the connection - * parameters. On exit, the PGconn should be in condition to start a fresh - * connection with the same parameters (see PQreset()). */ + + static void -closePGconn(PGconn *conn) +pqTerminateConn(PGconn *conn) { - PGnotify *notify; - pgParameterStatus *pstatus; - /* * Note that the protocol doesn't allow us to send Terminate messages * during the startup phase. @@ -2966,6 +3490,26 @@ closePGconn(PGconn *conn) (void) pqFlush(conn); } + +} + +/* + * closePGconn + * - properly close a connection to the backend + * + * This should reset or release all transient state, but NOT the connection + * parameters. On exit, the PGconn should be in condition to start a fresh + * connection with the same parameters (see PQreset()). + */ +static void +closePGconn(PGconn *conn) +{ + PGnotify *notify; + pgParameterStatus *pstatus; + + /* Send terminate request to backend */ + pqTerminateConn(conn); + /* * Must reset the blocking status so a possible reconnect will work. * @@ -2978,11 +3522,27 @@ closePGconn(PGconn *conn) * Close the connection, reset all transient state, flush I/O buffers. */ pqDropConnection(conn, true); + conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just * absent */ conn->asyncStatus = PGASYNC_IDLE; pqClearAsyncResult(conn); /* deallocate result */ resetPQExpBuffer(&conn->errorMessage); + + /* + * If addrlist is not freed, actualhost points in there. Otherwice it is + * allocated and should be freed + */ + if (conn->addrlist == NULL && conn->actualhost != NULL) + { + free(conn->actualhost); + conn->actualhost = NULL; + } + if (conn->actualport) + { + free(conn->actualport); + conn->actualport = NULL; + } pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); conn->addrlist = NULL; conn->addr_cur = NULL; @@ -3969,6 +4529,8 @@ parseServiceFile(const char *serviceFile, { int linenr = 0, i; + bool hostflag = false; /* true if we already have seen 'host' + * parameter in the service file, and */ FILE *f; char buf[MAXBUFSIZE], *line; @@ -4088,7 +4650,33 @@ parseServiceFile(const char *serviceFile, if (strcmp(options[i].keyword, key) == 0) { if (options[i].val == NULL) + { options[i].val = strdup(val); + + /* + * Set flag that we get value of host option from + * this service file, so subsequent host lines + * should be appended to it, not ignored + */ + if (strcmp(key, "host") == 0) + hostflag = true; + } + else if (strcmp(key, "host") == 0 && hostflag) + { + /* + * Old host value is from same service file, so + * append new one to it + */ + char *old = options[i].val; + int oldlen = strlen(old); + + options[i].val = malloc(oldlen + 1 + strlen(val) + 1); + strncpy(options[i].val, old, oldlen); + options[i].val[oldlen] = ','; + strcpy(options[i].val + oldlen + 1, val); + free(old); + } + if (!options[i].val) { printfPQExpBuffer(errorMessage, @@ -4808,87 +5396,126 @@ conninfo_uri_parse_options(PQconninfoOption *options, const char *uri, */ p = start; } - - /* - * "p" has been incremented past optional URI credential information at - * this point and now points at the "netloc" part of the URI. - * - * Look for IPv6 address. - */ - if (*p == '[') + host = p; + if (*p == ':') { - host = ++p; - while (*p && *p != ']') - ++p; - if (!*p) + int portnum; + char *portstr; + + *(p++) = '\0'; + portstr = p; + portnum = 0; + while (*p >= '0' && *p <= '9') { - printfPQExpBuffer(errorMessage, - libpq_gettext("end of string reached when looking for matching \"]\" in IPv6 host address in URI: \"%s\"\n"), - uri); - goto cleanup; + portnum = portnum * 10 + (*(p++) - '0'); } - if (p == host) + if (portnum > 65535 || portnum < 1) { printfPQExpBuffer(errorMessage, - libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), - uri); + libpq_gettext("invalid port number: \"%d\"\n"), + portnum); goto cleanup; } - - /* Cut off the bracket and advance */ - *(p++) = '\0'; - - /* - * The address may be followed by a port specifier or a slash or a - * query. - */ - if (*p && *p != ':' && *p != '/' && *p != '?') + prevchar = *p; + *p = '\0'; + if (*portstr && + !conninfo_storeval(options, "port", portstr, + errorMessage, false, true)) { - printfPQExpBuffer(errorMessage, - libpq_gettext("unexpected character \"%c\" at position %d in URI (expected \":\" or \"/\"): \"%s\"\n"), - *p, (int) (p - buf + 1), uri); goto cleanup; } } else { - /* not an IPv6 address: DNS-named or IPv4 netloc */ - host = p; + do + { + if (*p == ',') + p++; - /* - * Look for port specifier (colon) or end of host specifier (slash), - * or query (question mark). - */ - while (*p && *p != ':' && *p != '/' && *p != '?') - ++p; - } + /* + * "p" has been incremented past optional URI credential + * information at this point and now points at the "netloc" part + * of the URI. + * + * Look for IPv6 address. + */ + if (*p == '[') + { + char *ipv6start = ++p; - /* Save the hostname terminator before we null it */ - prevchar = *p; - *p = '\0'; + while (*p && *p != ']') + ++p; + if (!*p) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("end of string reached when looking for matching \"]\" in IPv6 host address in URI: \"%s\"\n"), + uri); + goto cleanup; + } + if (p == ipv6start) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), + uri); + goto cleanup; + } - if (*host && - !conninfo_storeval(options, "host", host, - errorMessage, false, true)) - goto cleanup; + p++; + /* + * The address may be followed by a port specifier, a comma or + * a slash or a query. + */ + if (*p && *p != ',' && *p != ':' && *p != '/' && *p != '?') + { + printfPQExpBuffer(errorMessage, + libpq_gettext("unexpected character \"%c\" at position %d in URI (expected \":\" or \"/\"): \"%s\"\n"), + *p, (int) (p - buf + 1), uri); + goto cleanup; + } - if (prevchar == ':') - { - const char *port = ++p; /* advance past host terminator */ + } + else + { + /* not an IPv6 address: DNS-named or IPv4 netloc */ - while (*p && *p != '/' && *p != '?') - ++p; + /* + * Look for port specifier (colon) or end of host specifier + * (slash), or query (question mark). + */ + while (*p && *p != ',' && *p != ':' && *p != '/' && *p != '?') + ++p; + } + /* Skip port specifier */ + if (*p == ':') + { + int portnum; + p++; + portnum = 0; + while (*p >= '0' && *p <= '9') + { + portnum = portnum * 10 + (*(p++) - '0'); + } + if (portnum > 65535 || portnum < 1) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("invalid port number: \"%d\"\n"), + portnum); + goto cleanup; + } + } + } while (*p == ','); + /* Save the hostname terminator before we null it */ prevchar = *p; *p = '\0'; - if (*port && - !conninfo_storeval(options, "port", port, + if (*host && + !conninfo_storeval(options, "host", host, errorMessage, false, true)) goto cleanup; - } + } if (prevchar && prevchar != '?') { const char *dbname = ++p; /* advance past host terminator */ @@ -5018,6 +5645,22 @@ conninfo_uri_parse_params(char *params, keyword = "sslmode"; value = "require"; } + if ((strcmp(keyword, "loadBalanceHosts") == 0 || + strcmp(keyword, "load_balance_hosts") == 0) && + strcmp(value, "true") == 0) + { + free(keyword); + free(value); + malloced = false; + keyword = "hostorder"; + value = "random"; + } + if (strcmp(keyword, "targetServerType") == 0) + { + free(keyword); + keyword = strdup("target_server_type"); + + } /* * Store the value if the corresponding option exists; ignore @@ -5232,7 +5875,22 @@ conninfo_storeval(PQconninfoOption *connOptions, } if (option->val) + { + if (strcmp(option->keyword, "host") == 0) + { + /* Accumulate multiple hosts in the single string */ + int val_len = strlen(option->val), + new_len = strlen(value); + + free(value_copy); + value_copy = malloc(val_len + 1 + new_len + 1); + strncpy(value_copy, option->val, val_len + 1); + value_copy[val_len] = ','; + strncpy(value_copy + val_len + 1, value, new_len + 1); + } free(option->val); + + } option->val = value_copy; return option; @@ -5352,7 +6010,9 @@ PQhost(const PGconn *conn) { if (!conn) return NULL; - if (conn->pghost != NULL && conn->pghost[0] != '\0') + if (conn->actualhost != NULL && conn->actualhost[0] != '\0') + return conn->actualhost; + else if (conn->pghost != NULL && conn->pghost[0] != '\0') return conn->pghost; else { @@ -5372,6 +6032,8 @@ PQport(const PGconn *conn) { if (!conn) return NULL; + if (conn->actualport) + return conn->actualport; return conn->pgport; } diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 9ca0756..23560f4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -62,7 +62,11 @@ typedef enum * backend startup. */ CONNECTION_SETENV, /* Negotiating environment. */ CONNECTION_SSL_STARTUP, /* Negotiating SSL. */ - CONNECTION_NEEDED /* Internal state: connect() needed */ + CONNECTION_NEEDED, /* Internal state: connect() needed */ + CONNECTION_CHECK_RO, /* Internal state: need to check is RO + * connection acceptable */ + CONNECTION_CHECK_RW, /* Internal state: waiting that server replies + * if it is in recovery */ } ConnStatusType; typedef enum diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index be6c370..d268453 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -334,7 +334,12 @@ struct pg_conn #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) char *krbsrvname; /* Kerberos service name */ #endif - + char *hostorder; /* How to handle multiple hosts */ + char *target_server_type; /* If "any" could work with readonly + * standby server. Otherwise should be + * "master" */ + char *failover_timeout; /* If no usable server found, how long + * to wait before retry */ /* Optional file to write trace info to */ FILE *Pfdebug; @@ -382,6 +387,11 @@ struct pg_conn struct addrinfo *addrlist; /* list of possible backend addresses */ struct addrinfo *addr_cur; /* the one currently being tried */ int addrlist_family; /* needed to know how to free addrlist */ + time_t failover_finish_time; /* how long to retry host list waiting + * for new master to appear */ + char *actualhost; /* Name of host we are actually connected to + * (if there is list in the pghost) */ + char *actualport; /* Port number we are actually connected to */ PGSetenvStatusType setenv_state; /* for 2.0 protocol only */ const PQEnvironmentOption *next_eo; bool send_appname; /* okay to send application_name? */ @@ -467,6 +477,7 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + }; /* PGcancel stores all data necessary to cancel a connection. A copy of this diff --git a/src/interfaces/libpq/test/expected.out b/src/interfaces/libpq/test/expected.out index d375e82..4832bdd 100644 --- a/src/interfaces/libpq/test/expected.out +++ b/src/interfaces/libpq/test/expected.out @@ -1,20 +1,20 @@ trying postgresql://uri-user:secret@host:12345/db -user='uri-user' password='secret' dbname='db' host='host' port='12345' (inet) +user='uri-user' password='secret' dbname='db' host='host:12345' (inet) trying postgresql://uri-user@host:12345/db -user='uri-user' dbname='db' host='host' port='12345' (inet) +user='uri-user' dbname='db' host='host:12345' (inet) trying postgresql://uri-user@host/db user='uri-user' dbname='db' host='host' (inet) trying postgresql://host:12345/db -dbname='db' host='host' port='12345' (inet) +dbname='db' host='host:12345' (inet) trying postgresql://host/db dbname='db' host='host' (inet) trying postgresql://uri-user@host:12345/ -user='uri-user' host='host' port='12345' (inet) +user='uri-user' host='host:12345' (inet) trying postgresql://uri-user@host/ user='uri-user' host='host' (inet) @@ -23,10 +23,10 @@ trying postgresql://uri-user@ user='uri-user' (local) trying postgresql://host:12345/ -host='host' port='12345' (inet) +host='host:12345' (inet) trying postgresql://host:12345 -host='host' port='12345' (inet) +host='host:12345' (inet) trying postgresql://host/db dbname='db' host='host' (inet) @@ -62,7 +62,7 @@ trying postgresql://host/db?u%7aer=someotheruser&port=12345 uri-regress: invalid URI query parameter: "uzer" trying postgresql://host:12345?user=uri-user -user='uri-user' host='host' port='12345' (inet) +user='uri-user' host='host:12345' (inet) trying postgresql://host?user=uri-user user='uri-user' host='host' (inet) @@ -71,19 +71,19 @@ trying postgresql://host? host='host' (inet) trying postgresql://[::1]:12345/db -dbname='db' host='::1' port='12345' (inet) +dbname='db' host='[::1]:12345' (inet) trying postgresql://[::1]/db -dbname='db' host='::1' (inet) +dbname='db' host='[::1]' (inet) trying postgresql://[2001:db8::1234]/ -host='2001:db8::1234' (inet) +host='[2001:db8::1234]' (inet) trying postgresql://[200z:db8::1234]/ -host='200z:db8::1234' (inet) +host='[200z:db8::1234]' (inet) trying postgresql://[::1] -host='::1' (inet) +host='[::1]' (inet) trying postgres:// (local) @@ -143,7 +143,7 @@ trying postgres://@host host='host' (inet) trying postgres://host:/ -host='host' (inet) +uri-regress: invalid port number: "0" trying postgres://:12345/ port='12345' (local) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index afbdb63..77f94ef 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -398,6 +398,7 @@ sub init unless defined $params{hba_permit_replication}; $params{allows_streaming} = 0 unless defined $params{allows_streaming}; $params{has_archiving} = 0 unless defined $params{has_archiving}; + $params{use_tcp} = 0 unless defined $params{use_tcp}; mkdir $self->backup_dir; mkdir $self->archive_dir; @@ -431,7 +432,11 @@ sub init else { print $conf "unix_socket_directories = '$host'\n"; - print $conf "listen_addresses = ''\n"; + if ($params{use_tcp} ) { + print $conf "listen_addresses = '$test_localhost'\n"; + } else { + print $conf "listen_addresses = ''\n"; + } } close $conf;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers