On 09.09.2019 18:12, Konstantin Knizhnik wrote:
On 06.09.2019 19:41, Konstantin Knizhnik wrote:
On 06.09.2019 1:01, Jaime Casanova wrote:
Sadly i got a lot of FAILED tests, i'm attaching the diffs on
regression with installcheck and installcheck-parallel.
btw, after make installcheck-parallel i wanted to do a new test but
wasn't able to drop regression database because there is still a
subscription, so i tried to drop it and got a core file (i was
connected trough the pool_worker), i'm attaching the backtrace of the
crash too.
Sorry, I failed to reproduce the crash.
So if you will be able to find out some scenario for reproduce it, I
will be very pleased to receive it.
I was able to reproduce the crash.
Patch is attached. Also I added proxyign of RESET command.
Unfortunately it is still not enough to pass regression tests with
"proxying_gucs=on".
Mostly because error messages doesn't match after prepending "set
local" commands.
I have implemented passing startup options to pooler backend.
Now "make installcheck" is passed without manual setting
datestyle/timezone/intervalstyle in postgresql.conf.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/contrib/spi/refint.c b/contrib/spi/refint.c
index adf0490..5c2095f 100644
--- a/contrib/spi/refint.c
+++ b/contrib/spi/refint.c
@@ -11,6 +11,7 @@
#include "commands/trigger.h"
#include "executor/spi.h"
+#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/rel.h"
@@ -93,6 +94,8 @@ check_primary_key(PG_FUNCTION_ARGS)
else
tuple = trigdata->tg_newtuple;
+ MyProc->is_tainted = true;
+
trigger = trigdata->tg_trigger;
nargs = trigger->tgnargs;
args = trigger->tgargs;
@@ -284,6 +287,8 @@ check_foreign_key(PG_FUNCTION_ARGS)
/* internal error */
elog(ERROR, "check_foreign_key: cannot process INSERT events");
+ MyProc->is_tainted = true;
+
/* Have to check tg_trigtuple - tuple being deleted */
trigtuple = trigdata->tg_trigtuple;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c91e3e1..df0bcaf 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -719,6 +719,169 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
+ <varlistentry id="guc-max-sessions" xreflabel="max_sessions">
+ <term><varname>max_sessions</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_sessions</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ The maximum number of client sessions that can be handled by
+ one connection proxy when session pooling is enabled.
+ This parameter does not add any memory or CPU overhead, so
+ specifying a large <varname>max_sessions</varname> value
+ does not affect performance.
+ If the <varname>max_sessions</varname> limit is reached new connections are not accepted.
+ </para>
+ <para>
+ The default value is 1000. This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-session-pool-size" xreflabel="session_pool_size">
+ <term><varname>session_pool_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>session_pool_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables session pooling and defines the maximum number of
+ backends that can be used by client sessions for each database/user combination.
+ Launched non-tainted backends are never terminated even if there are no active sessions.
+ Backend is considered as tainted if client updates GUCs, creates temporary table or prepared statements.
+ Tainted backend can server only one client.
+ </para>
+ <para>
+ The default value is 10, so up to 10 backends will serve each database,
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-proxy-port" xreflabel="proxy_port">
+ <term><varname>proxy_port</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>proxy_port</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the TCP port for the connection pooler.
+ Clients connected to main "port" will be assigned dedicated backends,
+ while client connected to proxy port will be connected to backends through proxy which
+ performs transaction level scheduling.
+ </para>
+ <para>
+ The default value is 6543.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-connection-proxies" xreflabel="connection_proxies">
+ <term><varname>connection_proxies</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>connection_proxies</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets number of connection proxies.
+ Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing).
+ Each proxy launches its own subset of backends.
+ So maximal number of non-tainted backends is <varname>session_pool_size*connection_proxies*databases*roles</varname>.
+ </para>
+ <para>
+ The default value is 0, so session pooling is disabled.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-session-schedule" xreflabel="session_schedule">
+ <term><varname>session_schedule</varname> (<type>enum</type>)
+ <indexterm>
+ <primary><varname>session_schedule</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies scheduling policy for assigning session to proxies in case of
+ connection pooling. Default policy is <literal>round-robin</literal>.
+ </para>
+ <para>
+ With <literal>round-robin</literal> policy postmaster cyclicly scatter sessions between proxies.
+ </para>
+ <para>
+ With <literal>random</literal> policy postmaster randomly choose proxy for new session.
+ </para>
+ <para>
+ With <literal>load-balancing</literal> policy postmaster choose proxy with lowest load average.
+ Load average of proxy is estimated by number of clients connection assigned to this proxy with extra weight for SSL connections.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-idle-pool-worker-timeout" xreflabel="idle_pool_worker_timeout">
+ <term><varname>idle_pool_worker_timeout</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>idle_pool_worker_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Terminate an idle connection pool worker after the specified number of milliseconds.
+ The default value is 0, so pool workers are never terminated.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-restart-pooler-on-reload" xreflabel="restart_pooler_on_reload">
+ <term><varname>restart_pooler_on_reload</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>restart_pooler_on_reload</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Restart session pool workers once <function>pg_reload_conf()</function> is called.
+ The default value is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-proxying-gucs" xreflabel="proxying_gucs">
+ <term><varname>proxying_gucs</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>proxying_gucs</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Support setting parameters in connection pooler sessions.
+ When this parameter is switched on, setting session parameters are replaced with setting local (transaction) parameters,
+ which are concatenated with each transaction or stanalone statement. It make it possible not to mark backend as tainted.
+ The default value is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-multitenant-proxy" xreflabel="multitenant_proxy">
+ <term><varname>multitenant_proxy</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>multitenant_proxy</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ One pool worker can serve clients with different roles.
+ When this parameter is switched on, each transaction or stanalone statement
+ are prepended with "set role" command.
+ The default value is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-unix-socket-directories" xreflabel="unix_socket_directories">
<term><varname>unix_socket_directories</varname> (<type>string</type>)
<indexterm>
diff --git a/doc/src/sgml/connpool.sgml b/doc/src/sgml/connpool.sgml
new file mode 100644
index 0000000..8dc9594
--- /dev/null
+++ b/doc/src/sgml/connpool.sgml
@@ -0,0 +1,182 @@
+<!-- doc/src/sgml/connpool.sgml -->
+
+ <chapter id="connection-pooling">
+ <title>Connection pooling</title>
+
+ <indexterm zone="connection-pooling">
+ <primary>built-in connection pool proxy</primary>
+ </indexterm>
+
+ <para>
+ <productname>PostgreSQL</productname> spawns a separate process (backend) for each client.
+ For large number of clients this model can consume a large number of system
+ resources and lead to significant performance degradation, especially on computers with large
+ number of CPU cores. The reason is high contention between backends for Postgres resources.
+ Also, the size of many Postgres internal data structures are proportional to the number of
+ active backends as well as complexity of algorithms for the data structures.
+ </para>
+
+ <para>
+ This is why many production Postgres installation are using some kind of connection pooling, such as
+ pgbouncer, J2EE, and odyssey. Using an external connection pooler requires additional efforts for installation,
+ configuration and maintenance. Also pgbouncer (the most popular connection pooler for Postgres) is
+ single-threaded and so can a be bottleneck on high load systems, so multiple instances of pgbouncer have to be launched.
+ </para>
+
+ <para>
+ Starting with version 12 <productname>PostgreSQL</productname> provides built-in connection pooler.
+ This chapter describes architecture and usage of built-in connection pooler.
+ </para>
+
+ <sect1 id="how-connection-pooler-works">
+ <title>How Built-in Connection Pooler Works</title>
+
+ <para>
+ Built-in connection pooler spawns one or more proxy processes which connect clients and backends.
+ Number of proxy processes is controlled by <varname>connection_proxies</varname> configuration parameter.
+ To avoid substantial changes in Postgres locking mechanism, only transaction level pooling policy is implemented.
+ It means that pooler is able to reschedule backend to another session only when it completed the current transaction.
+ </para>
+
+ <para>
+ As far as each Postgres backend is able to work only with single database, each proxy process maintains
+ hash table of connections pools for each pair of <literal>dbname,role</literal>.
+ Maximal number of backends which can be spawned by connection pool is limited by
+ <varname>session_pool_size</varname> configuration variable.
+ So maximal number of non-dedicated backends in pooling mode is limited by
+ <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<literal>#databases</literal>*<literal>#roles</literal>.
+ </para>
+
+ <para>
+ As it was mentioned above separate proxy instance is created for each <literal>dbname,role</literal> pair. Postgres backend is not able to work with more than one database. But it is possible to change current user (role) inside one connection.
+ If <varname>multitenent_proxy</varname> options is switched on, then separate proxy
+ will be create only for each database and current user is explicitly specified for each transaction/standalone statement using <literal>set command<literal> clause.
+ To support this mode you need to grant permissions to all roles to switch between each other.
+ </para>
+
+ <para>
+ To minimize number of changes in Postgres core, built-in connection pooler is not trying to save/restore
+ session context. If session context is modified by client application
+ (changing values of session variables (GUCs), creating temporary tables, preparing statements, advisory locking),
+ then backend executing this session is considered to be <emphasis>tainted</emphasis>.
+ It is now dedicated to this session and can not be rescheduled to other session.
+ Once this session is terminated, backend is terminated as well.
+ Non-tainted backends are not terminated even if there are no more connected sessions.
+ Switching on <varname>proxying_gucs</varname> configuration option allows to set sessions parameters without marking backend as <emphasis>tainted</emphasis>.
+ </para>
+
+ <para>
+ Built-in connection pooler accepts connections on a separate port (<varname>proxy_port</varname> configuration option, default value is 6543).
+ If client is connected to Postgres through standard port (<varname>port</varname> configuration option, default value is 5432), then normal (<emphasis>dedicated</emphasis>) backend is created. It works only
+ with this client and is terminated when client is disconnected. Standard port is also used by proxy itself to
+ launch new worker backends. It means that to enable connection pooler Postgres should be configured
+ to accept local connections (<literal>pg_hba.conf</literal> file).
+ </para>
+
+ <para>
+ If client application is connected through proxy port, then its communication with backend is always
+ performed through proxy. Even if it changes session context and backend becomes <emphasis>tainted</emphasis>,
+ still all traffic between this client and backend comes through proxy.
+ </para>
+
+ <para>
+ Postmaster accepts connections on proxy port and redirects it to one of connection proxies.
+ Right now sessions are bounded to proxy and can not migrate between them.
+ To provide uniform load balancing of proxies, postmaster uses one of three scheduling policies:
+ <literal>round-robin</literal>, <literal>random</literal> and <literal>load-balancing</literal>.
+ In the last case postmaster will choose proxy with smallest number of already attached clients, with
+ extra weight added to SSL connections (which consume more CPU).
+ </para>
+
+ </sect1>
+
+ <sect1 id="connection-pooler-configuration">
+ <title>Built-in Connection Pooler Configuration</title>
+
+ <para>
+ There are four main configuration variables controlling connection pooler:
+ <varname>session_pool_size</varname>, <varname>connection_proxies</varname>, <varname>max_sessions</varname> and <varname>proxy_port</varname>.
+ Connection pooler is enabled if all of them are non-zero.
+ </para>
+
+ <para>
+ <varname>connection_proxies</varname> specifies the number of connection proxy processes to be spawned.
+ Default value is zero, so connection pooling is disabled by default.
+ </para>
+
+ <para>
+ <varname>session_pool_size</varname> specifies the maximal number of backends per connection pool. Maximal number of launched non-dedicated backends in pooling mode is limited by
+ <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<literal>#databases</literal>*<literal>#roles</literal>.
+ If the number of backends is too small, the server will not be able to utilize all system resources.
+ But too large value can cause degradation of performance because of large snapshots and lock contention.
+ </para>
+
+ <para>
+ <varname>max_sessions</varname>parameter specifies maximal number of sessions which can be handled by one connection proxy.
+ Actually it affects only size of wait event set and so can be large enough without any essential negative impact on system resources consumption.
+ Default value is 1000. So maximal number of connections to one database/role is limited by <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<varname>max_sessions</varname>.
+ </para>
+
+ <para>
+ Connection proxy accepts connections on special port, defined by <varname>proxy_port</varname>.
+ Default value is 6543, but it can be changed to standard Postgres 5432, so by default all connections to the databases will be pooled.
+ It is still necessary to have a port for direct connections to the database (dedicated backends).
+ It is needed for connection pooler itself to launch worker backends.
+ </para>
+
+ <para>
+ Postmaster scatters sessions between proxies using one of three available scheduling policies:
+ <literal>round-robin</literal>, <literal>random</literal> and <literal>load-balancing</literal>.
+ Policy can be set using <varname>session_schedule</varname> configuration variable. Default policy is
+ <literal>round-robin</literal> which cause cyclic distribution of sessions between proxies.
+ It should work well in case of more or less uniform workload.
+ The smartest policy is <literal>load-balancing</literal> which tries to choose least loaded proxy
+ based on the available statistic. It is possible to monitor proxies state using <function>pg_pooler_state()</function> function, which returns information about number of clients, backends and pools for each proxy as well
+ as some statistic information about number of proceeded transactions and amount of data
+ sent from client to backends (<varname>rx_bytes</varname>) and from backends to clients (<varname>tx_bytes</varname>).
+ </para>
+
+ <para>
+ Because pooled backends are not terminated on client exit, it will not
+ be possible to drop database to which they are connected. It can be achieved without server restart using <varname>restart_pooler_on_reload</varname> variable. Setting this variable to <literal>true</literal> cause shutdown of all pooled backends after execution of <function>pg_reload_conf()</function> function. Then it will be possible to drop database. Alternatively you can specify <varname>idle-pool-worker-timeout</varname> which
+ forces termination of workers not used for the specified time. If database is not accessed for a long time, then all pool workers are terminated.
+ </para>
+
+ </sect1>
+
+ <sect1 id="connection-pooler-constraints">
+ <title>Built-in Connection Pooler Pros and Cons</title>
+
+ <para>
+ Unlike pgbouncer and other external connection poolers, the built-in connection pooler doesn't require installation and configuration of some other components.
+ It also does not introduce any limitations for clients: existing clients can work through proxy and don't notice any difference.
+ If client application requires session context, then it will be served by dedicated backend. Such connection will not participate in
+ connection pooling but it will correctly work. This is the main difference with pgbouncer,
+ which may cause incorrect behavior of client application in case of using other session level pooling policy.
+ And if application is not changing session context, then it can be implicitly pooled, reducing number of active backends.
+ </para>
+
+ <para>
+ The main limitation of current built-in connection pooler implementation is that it is not able to save/resume session context.
+ Although it is not so difficult to do, but it requires more changes in Postgres core. Developers of client applications have
+ the choice to either avoid using session-specific operations, or not use built-in pooling. For example, using prepared statements can improve speed of simple queries
+ up to two times. But prepared statements can not be handled by pooled backend, so if all clients are using prepared statements, then there will be no connection pooling
+ even if connection pooling is enabled.
+ </para>
+
+ <para>
+ Redirecting connections through the connection proxy definitely has a negative effect on total system performance, especially latency.
+ The overhead of the connection proxy depends on many factors, such as characteristics of external and internal networks, complexity of queries and size of returned result set.
+ With a small number of connections (10), pgbench benchmark in select-only mode shows almost two times worse performance for local connections through connection pooler compared with direct local connections. For much larger number of connections (when pooling is actually required), pooling mode outperforms direct connection mode.
+ </para>
+
+ <para>
+ Another obvious limitation of transaction level pooling is that long living transaction can cause starvation of
+ other clients. It greatly depends on application design. If application opens database transaction and then waits for user input or some other external event, then backend can be in <emphasis>idle-in-transaction</emphasis>
+ state for long enough time. An <emphasis>idle-in-transaction</emphasis> backend can not be rescheduled for another session.
+ The obvious recommendation is to avoid long-living transaction and setup <varname>idle_in_transaction_session_timeout</varname> to implicitly abort such transactions.
+ </para>
+
+ </sect1>
+
+ </chapter>
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 3da2365..b82637e 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -29,6 +29,7 @@
<!ENTITY syntax SYSTEM "syntax.sgml">
<!ENTITY textsearch SYSTEM "textsearch.sgml">
<!ENTITY typeconv SYSTEM "typeconv.sgml">
+<!ENTITY connpool SYSTEM "connpool.sgml">
<!-- administrator's guide -->
<!ENTITY backup SYSTEM "backup.sgml">
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 3e115f1..ee6e2bd 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -158,6 +158,7 @@
&maintenance;
&backup;
&high-availability;
+ &connpool;
&monitoring;
&diskusage;
&wal;
diff --git a/src/Makefile b/src/Makefile
index bcdbd95..196ca8c 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
interfaces \
backend/replication/libpqwalreceiver \
backend/replication/pgoutput \
+ backend/postmaster/libpqconn \
fe_utils \
bin \
pl \
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 83f9959..cf7d1dd 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -28,6 +28,7 @@
#include "executor/executor.h"
#include "executor/tstoreReceiver.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
@@ -57,6 +58,8 @@ PerformCursorOpen(DeclareCursorStmt *cstmt, ParamListInfo params,
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("invalid cursor name: must not be empty")));
+ MyProc->is_tainted = true; /* cursors are not compatible with builtin connection pooler */
+
/*
* If this is a non-holdable cursor, we require that this statement has
* been executed inside a transaction block (or else, it would have no
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index c12b613..7d60c9b 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -30,6 +30,7 @@
#include "parser/parse_expr.h"
#include "parser/parse_type.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -457,6 +458,7 @@ StorePreparedStatement(const char *stmt_name,
stmt_name,
HASH_ENTER,
&found);
+ MyProc->is_tainted = true;
/* Shouldn't get a duplicate entry */
if (found)
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 0960b33..ac51dc4 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -251,6 +251,19 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
heap_freetuple(tuple);
table_close(rel, RowExclusiveLock);
+ /*
+ * TODO:
+ * Using currval() may cause incorrect behaviour with connectoin pooler.
+ * Unfortunately makring backend as tainted in currval() is too late.
+ * This is why it is done in nextval(), althougth it is not strictly required, because
+ * nextval() may be not followed by currval().
+ * But currval() may be not preceeded by nextval().
+ * To make regression tests passed, backend is also marker ias tainted when it creates
+ * sequence. Certainly it is just temoporary workaround, because sequence may be created
+ * in one backend and accessed in another.
+ */
+ MyProc->is_tainted = true; /* in case of using currval() */
+
return address;
}
@@ -564,6 +577,8 @@ nextval(PG_FUNCTION_ARGS)
*/
relid = RangeVarGetRelid(sequence, NoLock, false);
+ MyProc->is_tainted = true; /* in case of using currval() */
+
PG_RETURN_INT64(nextval_internal(relid, true));
}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index fb2be10..b0af84b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -591,6 +591,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("ON COMMIT can only be used on temporary tables")));
+ if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP
+ && stmt->oncommit != ONCOMMIT_DROP)
+ MyProc->is_tainted = true;
+
if (stmt->partspec != NULL)
{
if (relkind != RELKIND_RELATION)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 384887e..ebff20a 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -195,15 +195,13 @@ pq_init(void)
{
/* initialize state variables */
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
- PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+ if (!PqSendBuffer)
+ PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
PqCommReadingMsg = false;
DoingCopyOut = false;
- /* set up process-exit hook to close the socket */
- on_proc_exit(socket_close, 0);
-
/*
* In backends (as soon as forked) we operate the underlying socket in
* nonblocking mode and use latches to implement blocking semantics if
@@ -220,6 +218,11 @@ pq_init(void)
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
+ if (FeBeWaitSet)
+ FreeWaitEventSet(FeBeWaitSet);
+ else
+ on_proc_exit(socket_close, 0);
+
FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
NULL, NULL);
@@ -227,6 +230,7 @@ pq_init(void)
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
}
+
/* --------------------------------
* socket_comm_reset - reset libpq during error recovery
*
@@ -329,7 +333,7 @@ socket_close(int code, Datum arg)
int
StreamServerPort(int family, char *hostName, unsigned short portNumber,
char *unixSocketDir,
- pgsocket ListenSocket[], int MaxListen)
+ pgsocket ListenSocket[], int ListenPort[], int MaxListen)
{
pgsocket fd;
int err;
@@ -593,6 +597,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
familyDesc, addrDesc, (int) portNumber)));
ListenSocket[listen_index] = fd;
+ ListenPort[listen_index] = portNumber;
added++;
}
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index f4120be..e0cdd9e 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..6ea4f35
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,165 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+#ifdef WIN32
+typedef struct
+{
+ SOCKET origsocket;
+ WSAPROTOCOL_INFO wsainfo;
+} InheritableSocket;
+#endif
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int
+pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid)
+{
+#ifdef WIN32
+ InheritableSocket dst;
+ size_t rc;
+ dst.origsocket = sock;
+ if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0)
+ {
+ ereport(FATAL,
+ (errmsg("could not duplicate socket %d for use in backend: error code %d",
+ (int)sock, WSAGetLastError())));
+ return -1;
+ }
+ rc = send(chan, (char*)&dst, sizeof(dst), 0);
+ if (rc != sizeof(dst))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to send inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ return -1;
+ }
+ return 0;
+#else
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ return PGINVALID_SOCKET;
+
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ while (sendmsg(chan, &msg, 0) < 0)
+ {
+ if (errno != EINTR)
+ return PGINVALID_SOCKET;
+ }
+ return 0;
+#endif
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket
+pg_recv_sock(pgsocket chan)
+{
+#ifdef WIN32
+ InheritableSocket src;
+ SOCKET s;
+ size_t rc = recv(chan, (char*)&src, sizeof(src), 0);
+ if (rc != sizeof(src))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to receive inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ }
+ s = WSASocket(FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ &src.wsainfo,
+ 0,
+ 0);
+ if (s == INVALID_SOCKET)
+ {
+ ereport(FATAL,
+ (errmsg("could not create inherited socket: error code %d\n",
+ WSAGetLastError())));
+ }
+
+ /*
+ * To make sure we don't get two references to the same socket, close
+ * the original one. (This would happen when inheritance actually
+ * works..
+ */
+ closesocket(src.origsocket);
+ return s;
+#else
+ pgsocket sock;
+ char c_buffer[CMSG_LEN(sizeof(sock))];
+ char m_buffer[1];
+ struct msghdr msg = {0};
+ struct iovec io;
+ struct cmsghdr * cmsg;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ while (recvmsg(chan, &msg, 0) < 0)
+ {
+ if (errno != EINTR)
+ return PGINVALID_SOCKET;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ {
+ elog(WARNING, "Failed to transfer socket");
+ return PGINVALID_SOCKET;
+ }
+
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+ pg_set_noblock(sock);
+
+ return sock;
+#endif
+}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index d5b5e77..1564c8c 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -690,3 +690,65 @@ pgwin32_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, c
memcpy(writefds, &outwritefds, sizeof(fd_set));
return nummatches;
}
+
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2])
+{
+ union {
+ struct sockaddr_in inaddr;
+ struct sockaddr addr;
+ } a;
+ SOCKET listener;
+ int e;
+ socklen_t addrlen = sizeof(a.inaddr);
+ DWORD flags = 0;
+ int reuse = 1;
+
+ socks[0] = socks[1] = -1;
+
+ listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listener == -1)
+ return SOCKET_ERROR;
+
+ memset(&a, 0, sizeof(a));
+ a.inaddr.sin_family = AF_INET;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_port = 0;
+
+ for (;;) {
+ if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+ (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+ break;
+ if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ memset(&a, 0, sizeof(a));
+ if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+ break;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_family = AF_INET;
+
+ if (listen(listener, 1) == SOCKET_ERROR)
+ break;
+
+ socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+ if (socks[0] == -1)
+ break;
+ if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ socks[1] = accept(listener, NULL, NULL);
+ if (socks[1] == -1)
+ break;
+
+ closesocket(listener);
+ return 0;
+ }
+
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+ socks[0] = socks[1] = -1;
+ return SOCKET_ERROR;
+}
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c2321..9622ee7 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -13,6 +13,6 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
- pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+ pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o proxy.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/libpqconn/Makefile b/src/backend/postmaster/libpqconn/Makefile
new file mode 100644
index 0000000..f05b727
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/Makefile
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/postmaster/libpqconn
+#
+# IDENTIFICATION
+# src/backend/postmaster/libpqconn/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/postmaster/libpqconn
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = libpqconn.o $(WIN32RES)
+SHLIB_LINK_INTERNAL = $(libpq)
+SHLIB_LINK = $(filter -lintl, $(LIBS))
+SHLIB_PREREQS = submake-libpq
+PGFILEDESC = "libpqconn - open libpq connection"
+NAME = libpqconn
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+ rm -f $(OBJS)
diff --git a/src/backend/postmaster/libpqconn/libpqconn.c b/src/backend/postmaster/libpqconn/libpqconn.c
new file mode 100644
index 0000000..d950a8c
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/libpqconn.c
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqconn.c
+ *
+ * This file provides a way to establish connection to postgres instanc from backend.
+ *
+ * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/libpqconn/libpqconn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "fmgr.h"
+#include "libpq-fe.h"
+#include "postmaster/postmaster.h"
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+static void*
+libpq_connectdb(char const* keywords[], char const* values[], char** error)
+{
+ PGconn* conn = PQconnectdbParams(keywords, values, false);
+ if (conn && PQstatus(conn) != CONNECTION_OK)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not setup local connect to server"),
+ errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
+ *error = strdup(PQerrorMessage(conn));
+ PQfinish(conn);
+ return NULL;
+ }
+ *error = NULL;
+ return conn;
+}
+
+void _PG_init(void)
+{
+ LibpqConnectdbParams = libpq_connectdb;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3339804..739b8fd 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -114,6 +114,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
#include "postmaster/syslogger.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
@@ -196,6 +197,9 @@ BackgroundWorker *MyBgworkerEntry = NULL;
/* The socket number we are listening for connections on */
int PostPortNumber;
+/* The socket number we are listening for pooled connections on */
+int ProxyPortNumber;
+
/* The directory names for Unix socket(s) */
char *Unix_socket_directories;
@@ -216,6 +220,7 @@ int ReservedBackends;
/* The socket(s) we're listening to. */
#define MAXLISTEN 64
static pgsocket ListenSocket[MAXLISTEN];
+static int ListenPort[MAXLISTEN];
/*
* Set by the -o option
@@ -246,6 +251,18 @@ bool enable_bonjour = false;
char *bonjour_name;
bool restart_after_crash = true;
+typedef struct ConnectionProxy
+{
+ int pid;
+ pgsocket socks[2];
+} ConnectionProxy;
+
+ConnectionProxy* ConnectionProxies;
+static bool ConnectionProxiesStarted;
+static int CurrentConnectionProxy; /* index used for round-robin distribution of connections between proxies */
+
+void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[], char** error);
+
/* PIDs of special child processes; 0 when not running */
static pid_t StartupPID = 0,
BgWriterPID = 0,
@@ -403,7 +420,6 @@ static void BackendInitialize(Port *port);
static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
-static int BackendStartup(Port *port);
static int ProcessStartupPacket(Port *port, bool secure_done);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
@@ -425,6 +441,7 @@ static pid_t StartChildProcess(AuxProcType type);
static void StartAutovacuumWorker(void);
static void MaybeStartWalReceiver(void);
static void InitPostmasterDeathWatchHandle(void);
+static void StartProxyWorker(int id);
/*
* Archiver is allowed to start up at the current postmaster state?
@@ -477,6 +494,8 @@ typedef struct
{
Port port;
InheritableSocket portsocket;
+ InheritableSocket proxySocket;
+ int proxyId;
char DataDir[MAXPGPATH];
pgsocket ListenSocket[MAXLISTEN];
int32 MyCancelKey;
@@ -560,6 +579,48 @@ int postmaster_alive_fds[2] = {-1, -1};
HANDLE PostmasterHandle;
#endif
+static void
+StartConnectionProxies(void)
+{
+ if (SessionPoolSize > 0 && ConnectionProxiesNumber > 0 && !ConnectionProxiesStarted)
+ {
+ int i;
+ if (ConnectionProxies == NULL)
+ {
+ ConnectionProxies = malloc(sizeof(ConnectionProxy)*ConnectionProxiesNumber);
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ConnectionProxies[i].socks) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+ }
+ }
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ StartProxyWorker(i);
+ }
+ ConnectionProxiesStarted = true;
+ }
+}
+
+/*
+ * Send signal to connection proxies
+ */
+static void
+StopConnectionProxies(int signal)
+{
+ if (ConnectionProxiesStarted)
+ {
+ int i;
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ signal_child(ConnectionProxies[i].pid, signal);
+ }
+ ConnectionProxiesStarted = false;
+ }
+}
+
/*
* Postmaster main entry point
*/
@@ -572,6 +633,9 @@ PostmasterMain(int argc, char *argv[])
bool listen_addr_saved = false;
int i;
char *output_config_variable = NULL;
+ bool contains_localhost = false;
+ int ports[2];
+ int n_ports = 0;
InitProcessGlobals();
@@ -1008,6 +1072,11 @@ PostmasterMain(int argc, char *argv[])
on_proc_exit(CloseServerPorts, 0);
+ /* Listen on proxy socket only if session pooling is enabled */
+ if (ProxyPortNumber > 0 && ConnectionProxiesNumber > 0 && SessionPoolSize > 0)
+ ports[n_ports++] = ProxyPortNumber;
+ ports[n_ports++] = PostPortNumber;
+
if (ListenAddresses)
{
char *rawstring;
@@ -1031,32 +1100,36 @@ PostmasterMain(int argc, char *argv[])
foreach(l, elemlist)
{
char *curhost = (char *) lfirst(l);
-
- if (strcmp(curhost, "*") == 0)
- status = StreamServerPort(AF_UNSPEC, NULL,
- (unsigned short) PostPortNumber,
- NULL,
- ListenSocket, MAXLISTEN);
- else
- status = StreamServerPort(AF_UNSPEC, curhost,
- (unsigned short) PostPortNumber,
- NULL,
- ListenSocket, MAXLISTEN);
-
- if (status == STATUS_OK)
+ for (i = 0; i < n_ports; i++)
{
- success++;
- /* record the first successful host addr in lockfile */
- if (!listen_addr_saved)
+ int port = ports[i];
+ if (strcmp(curhost, "*") == 0)
+ status = StreamServerPort(AF_UNSPEC, NULL,
+ (unsigned short) port,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ else
+ status = StreamServerPort(AF_UNSPEC, curhost,
+ (unsigned short) port,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ contains_localhost |= strcmp(curhost, "localhost") == 0;
+
+ if (status == STATUS_OK)
{
- AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
- listen_addr_saved = true;
+ success++;
+ /* record the first successful host addr in lockfile */
+ if (!listen_addr_saved)
+ {
+ AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
+ listen_addr_saved = true;
+ }
}
+ else
+ ereport(WARNING,
+ (errmsg("could not create listen socket for \"%s\"",
+ curhost)));
}
- else
- ereport(WARNING,
- (errmsg("could not create listen socket for \"%s\"",
- curhost)));
}
if (!success && elemlist != NIL)
@@ -1125,29 +1198,32 @@ PostmasterMain(int argc, char *argv[])
errmsg("invalid list syntax in parameter \"%s\"",
"unix_socket_directories")));
}
-
+ contains_localhost = true;
foreach(l, elemlist)
{
char *socketdir = (char *) lfirst(l);
+ for (i = 0; i < n_ports; i++)
+ {
+ int port = ports[i];
- status = StreamServerPort(AF_UNIX, NULL,
- (unsigned short) PostPortNumber,
- socketdir,
- ListenSocket, MAXLISTEN);
+ status = StreamServerPort(AF_UNIX, NULL,
+ (unsigned short) port,
+ socketdir,
+ ListenSocket, ListenPort, MAXLISTEN);
- if (status == STATUS_OK)
- {
- success++;
- /* record the first successful Unix socket in lockfile */
- if (success == 1)
- AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+ if (status == STATUS_OK)
+ {
+ success++;
+ /* record the first successful Unix socket in lockfile */
+ if (success == 1)
+ AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+ }
+ else
+ ereport(WARNING,
+ (errmsg("could not create Unix-domain socket in directory \"%s\"",
+ socketdir)));
}
- else
- ereport(WARNING,
- (errmsg("could not create Unix-domain socket in directory \"%s\"",
- socketdir)));
}
-
if (!success && elemlist != NIL)
ereport(FATAL,
(errmsg("could not create any Unix-domain sockets")));
@@ -1157,6 +1233,20 @@ PostmasterMain(int argc, char *argv[])
}
#endif
+ if (!contains_localhost && ProxyPortNumber > 0)
+ {
+ /* we need to accept local connections from proxy */
+ status = StreamServerPort(AF_UNSPEC, "localhost",
+ (unsigned short) PostPortNumber,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ if (status != STATUS_OK)
+ {
+ ereport(WARNING,
+ (errmsg("could not create listen socket for localhost")));
+ }
+ }
+
/*
* check that we have some socket to listen on
*/
@@ -1374,6 +1464,8 @@ PostmasterMain(int argc, char *argv[])
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
+ StartConnectionProxies();
+
status = ServerLoop();
/*
@@ -1611,6 +1703,57 @@ DetermineSleepTime(struct timeval *timeout)
}
}
+/**
+ * This function tries to estimate workload of proxy.
+ * We have a lot of information about proxy state in ProxyState array:
+ * total number of clients, SSL clients, backends, traffic, number of transactions,...
+ * So in principle it is possible to implement much more sophisticated evaluation function,
+ * but right now we take in account only number of clients and SSL connections (which requires much more CPU)
+ */
+static uint64
+GetConnectionProxyWorkload(int id)
+{
+ return ProxyState[id].n_clients + ProxyState[id].n_ssl_clients*3;
+}
+
+/**
+ * Choose connection pool for this session.
+ * Right now sessions can not be moved between pools (in principle it is not so difficult to implement it),
+ * so to support order balancing we should do some smart work here.
+ */
+static ConnectionProxy*
+SelectConnectionProxy(void)
+{
+ int i;
+ uint64 min_workload;
+ int least_loaded_proxy;
+
+ switch (SessionSchedule)
+ {
+ case SESSION_SCHED_ROUND_ROBIN:
+ return &ConnectionProxies[CurrentConnectionProxy++ % ConnectionProxiesNumber];
+ case SESSION_SCHED_RANDOM:
+ return &ConnectionProxies[random() % ConnectionProxiesNumber];
+ case SESSION_SCHED_LOAD_BALANCING:
+ min_workload = GetConnectionProxyWorkload(0);
+ least_loaded_proxy = 0;
+ for (i = 1; i < ConnectionProxiesNumber; i++)
+ {
+ int workload = GetConnectionProxyWorkload(i);
+ if (workload < min_workload)
+ {
+ min_workload = workload;
+ least_loaded_proxy = i;
+ }
+ }
+ return &ConnectionProxies[least_loaded_proxy];
+ default:
+ Assert(false);
+ }
+ return NULL;
+}
+
+
/*
* Main idle loop of postmaster
*
@@ -1701,8 +1844,18 @@ ServerLoop(void)
port = ConnCreate(ListenSocket[i]);
if (port)
{
- BackendStartup(port);
-
+ if (ConnectionProxies && ListenPort[i] == ProxyPortNumber)
+ {
+ ConnectionProxy* proxy = SelectConnectionProxy();
+ if (pg_send_sock(proxy->socks[0], port->sock, proxy->pid) < 0)
+ {
+ elog(LOG, "could not send socket to connection pool: %m");
+ }
+ }
+ else
+ {
+ BackendStartup(port, NULL);
+ }
/*
* We no longer need the open socket or port structure
* in this process
@@ -1899,8 +2052,6 @@ ProcessStartupPacket(Port *port, bool secure_done)
{
int32 len;
void *buf;
- ProtocolVersion proto;
- MemoryContext oldcontext;
pq_startmsgread();
@@ -1967,6 +2118,18 @@ ProcessStartupPacket(Port *port, bool secure_done)
}
pq_endmsgread();
+ return ParseStartupPacket(port, TopMemoryContext, buf, len, secure_done);
+}
+
+int
+ParseStartupPacket(Port *port, MemoryContext memctx, void* buf, int len, bool secure_done)
+{
+ ProtocolVersion proto;
+ MemoryContext oldcontext;
+
+ am_walsender = false;
+ am_db_walsender = false;
+
/*
* The first field is either a protocol version number or a special
* request code.
@@ -2067,7 +2230,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2739,6 +2902,8 @@ pmdie(SIGNAL_ARGS)
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
+ StopConnectionProxies(SIGTERM);
+
/*
* If we're in recovery, we can't kill the startup process
* right away, because at present doing so does not release
@@ -2816,6 +2981,9 @@ pmdie(SIGNAL_ARGS)
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
+
+ StopConnectionProxies(SIGTERM);
+
pmState = PM_WAIT_BACKENDS;
}
@@ -4041,6 +4209,7 @@ TerminateChildren(int signal)
signal_child(PgArchPID, signal);
if (PgStatPID != 0)
signal_child(PgStatPID, signal);
+ StopConnectionProxies(signal);
}
/*
@@ -4050,8 +4219,8 @@ TerminateChildren(int signal)
*
* Note: if you change this code, also consider StartAutovacuumWorker.
*/
-static int
-BackendStartup(Port *port)
+int
+BackendStartup(Port *port, int* backend_pid)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
@@ -4155,6 +4324,8 @@ BackendStartup(Port *port)
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
#endif
+ if (backend_pid)
+ *backend_pid = pid;
return STATUS_OK;
}
@@ -4851,6 +5022,7 @@ SubPostmasterMain(int argc, char *argv[])
if (strcmp(argv[1], "--forkbackend") == 0 ||
strcmp(argv[1], "--forkavlauncher") == 0 ||
strcmp(argv[1], "--forkavworker") == 0 ||
+ strcmp(argv[1], "--forkproxy") == 0 ||
strcmp(argv[1], "--forkboot") == 0 ||
strncmp(argv[1], "--forkbgworker=", 15) == 0)
PGSharedMemoryReAttach();
@@ -4991,6 +5163,19 @@ SubPostmasterMain(int argc, char *argv[])
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
}
+ if (strcmp(argv[1], "--forkproxy") == 0)
+ {
+ /* Restore basic shared memory pointers */
+ InitShmemAccess(UsedShmemSegAddr);
+
+ /* Need a PGPROC to run CreateSharedMemoryAndSemaphores */
+ InitProcess();
+
+ /* Attach process to shared data structures */
+ CreateSharedMemoryAndSemaphores(0);
+
+ ConnectionProxyMain(argc - 2, argv + 2); /* does not return */
+ }
if (strncmp(argv[1], "--forkbgworker=", 15) == 0)
{
int shmem_slot;
@@ -5526,6 +5711,74 @@ StartAutovacuumWorker(void)
}
/*
+ * StartProxyWorker
+ * Start an proxy worker process.
+ *
+ * This function is here because it enters the resulting PID into the
+ * postmaster's private backends list.
+ *
+ * NB -- this code very roughly matches BackendStartup.
+ */
+static void
+StartProxyWorker(int id)
+{
+ Backend *bn;
+ int pid;
+
+ /*
+ * Compute the cancel key that will be assigned to this session. We
+ * probably don't need cancel keys for autovac workers, but we'd
+ * better have something random in the field to prevent unfriendly
+ * people from sending cancels to them.
+ */
+ if (!RandomCancelKey(&MyCancelKey))
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("could not generate random cancel key")));
+ return ;
+ }
+ bn = (Backend *) malloc(sizeof(Backend));
+ if (bn)
+ {
+ bn->cancel_key = MyCancelKey;
+
+ /* Autovac workers are not dead_end and need a child slot */
+ bn->dead_end = false;
+ bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
+ bn->bgworker_notify = false;
+
+ MyProxyId = id;
+ MyProxySocket = ConnectionProxies[id].socks[1];
+ pid = ConnectionProxyStart();
+ if (pid > 0)
+ {
+ bn->pid = pid;
+ bn->bkend_type = BACKEND_TYPE_BGWORKER;
+ dlist_push_head(&BackendList, &bn->elem);
+#ifdef EXEC_BACKEND
+ ShmemBackendArrayAdd(bn);
+#endif
+ /* all OK */
+ ConnectionProxies[id].pid = pid;
+ ProxyState[id].pid = pid;
+ return;
+ }
+
+ /*
+ * fork failed, fall through to report -- actual error message was
+ * logged by ConnectionProxyStart
+ */
+ (void) ReleasePostmasterChildSlot(bn->child_slot);
+ free(bn);
+ }
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+}
+
+/*
* MaybeStartWalReceiver
* Start the WAL receiver process, if not running and our state allows.
*
@@ -6116,6 +6369,10 @@ save_backend_variables(BackendParameters *param, Port *port,
strlcpy(param->ExtraOptions, ExtraOptions, MAXPGPATH);
+ if (!write_inheritable_socket(¶m->proxySocket, MyProxySocket, childPid))
+ return false;
+ param->proxyId = MyProxyId;
+
return true;
}
@@ -6347,6 +6604,9 @@ restore_backend_variables(BackendParameters *param, Port *port)
strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH);
strlcpy(ExtraOptions, param->ExtraOptions, MAXPGPATH);
+
+ read_inheritable_socket(&MyProxySocket, ¶m->proxySocket);
+ MyProxyId = param->proxyId;
}
diff --git a/src/backend/postmaster/proxy.c b/src/backend/postmaster/proxy.c
new file mode 100644
index 0000000..eb8dcad
--- /dev/null
+++ b/src/backend/postmaster/proxy.c
@@ -0,0 +1,1482 @@
+#include <unistd.h>
+#include <errno.h>
+
+#include "postgres.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
+#include "postmaster/fork_process.h"
+#include "access/htup_details.h"
+#include "replication/walsender.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/timestamp.h"
+#include "libpq/libpq.h"
+#include "libpq/libpq-be.h"
+#include "libpq/pqsignal.h"
+#include "libpq/pqformat.h"
+#include "tcop/tcopprot.h"
+#include "utils/timeout.h"
+#include "utils/ps_status.h"
+#include "../interfaces/libpq/libpq-fe.h"
+#include "../interfaces/libpq/libpq-int.h"
+
+#define INIT_BUF_SIZE (64*1024)
+#define MAX_READY_EVENTS 128
+#define DB_HASH_SIZE 101
+#define PROXY_WAIT_TIMEOUT 1000 /* 1 second */
+
+struct SessionPool;
+struct Proxy;
+
+typedef struct
+{
+ char database[NAMEDATALEN];
+ char username[NAMEDATALEN];
+}
+SessionPoolKey;
+
+#define NULLSTR(s) ((s) ? (s) : "?")
+
+/*
+ * Channels represent both clients and backends
+ */
+typedef struct Channel
+{
+ int magic;
+ char* buf;
+ int rx_pos;
+ int tx_pos;
+ int tx_size;
+ int buf_size;
+ int event_pos; /* Position of wait event returned by AddWaitEventToSet */
+
+ Port* client_port; /* Not null for client, null for server */
+
+ pgsocket backend_socket;
+ PGPROC* backend_proc;
+ int backend_pid;
+ bool backend_is_tainted; /* client changes session context */
+ bool backend_is_ready; /* ready for query */
+ bool is_interrupted; /* client interrupts query execution */
+ bool is_disconnected; /* connection is lost */
+ bool is_idle; /* no activity on this channel */
+ bool in_transaction; /* inside transaction body */
+ bool edge_triggered; /* emulate epoll EPOLLET (edge-triggered) flag */
+ /* We need to save startup packet response to be able to send it to new connection */
+ int handshake_response_size;
+ char* handshake_response;
+ TimestampTz backend_last_activity; /* time of last backend activity */
+ char* gucs; /* concatenated "SET var=" commands for this session */
+ char* prev_gucs; /* previous value of "gucs" to perform rollback in case of error */
+ struct Channel* peer;
+ struct Channel* next;
+ struct Proxy* proxy;
+ struct SessionPool* pool;
+}
+Channel;
+
+#define ACTIVE_CHANNEL_MAGIC 0xDEFA1234U
+#define REMOVED_CHANNEL_MAGIC 0xDEADDEEDU
+
+/*
+ * Control structure for connection proxies (several proxy workers can be launched and each has its own proxy instance).
+ * Proxy contains hash of session pools for reach role/dbname combination.
+ */
+typedef struct Proxy
+{
+ MemoryContext parse_ctx; /* Temporary memory context used for parsing startup packet */
+ WaitEventSet* wait_events; /* Set of socket descriptors of backends and clients socket descriptors */
+ HTAB* pools; /* Session pool map with dbname/role used as a key */
+ int n_accepted_connections; /* Number of accepted, but not yet established connections
+ * (startup packet is not received and db/role are not known) */
+ int max_backends; /* Maximal number of backends per database */
+ bool shutdown; /* Shutdown flag */
+ Channel* hangout; /* List of disconnected backends */
+ ConnectionProxyState* state; /* State of proxy */
+ TimestampTz last_idle_timeout_check; /* Time of last check for idle worker timeout expration */
+} Proxy;
+
+/*
+ * Connection pool to particular role/dbname
+ */
+typedef struct SessionPool
+{
+ SessionPoolKey key;
+ Channel* idle_backends; /* List of idle clients */
+ Channel* pending_clients; /* List of clients waiting for free backend */
+ Proxy* proxy; /* Owner of this pool */
+ int n_launched_backends; /* Total number of launched backends */
+ int n_idle_backends; /* Number of backends in idle state */
+ int n_connected_clients; /* Total number of connected clients */
+ int n_idle_clients; /* Number of clients in idle state */
+ int n_pending_clients; /* Number of clients waiting for free backend */
+ List* startup_gucs; /* List of startup options specified in startup packet */
+ char* cmdline_options; /* Command line options passed to backend */
+}
+SessionPool;
+
+static void channel_remove(Channel* chan);
+static Channel* backend_start(SessionPool* pool, char** error);
+static bool channel_read(Channel* chan);
+static bool channel_write(Channel* chan, bool synchronous);
+static void channel_hangout(Channel* chan, char const* op);
+static ssize_t socket_write(Channel* chan, char const* buf, size_t size);
+
+/*
+ * #define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__)
+ */
+#define ELOG(severity,fmt,...)
+
+
+static Proxy* proxy;
+int MyProxyId;
+pgsocket MyProxySocket;
+ConnectionProxyState* ProxyState;
+
+/**
+ * Backend is ready for next command outside transaction block (idle state).
+ * Now if backend is not tainted it is possible to schedule some other client to this backend.
+ */
+static bool
+backend_reschedule(Channel* chan, bool is_new)
+{
+ chan->backend_is_ready = false;
+ if (chan->backend_proc == NULL) /* Lazy resolving of PGPROC entry */
+ {
+ Assert(chan->backend_pid != 0);
+ chan->backend_proc = BackendPidGetProc(chan->backend_pid);
+ Assert(chan->backend_proc); /* If backend completes execution of some query, then it has definitely registered itself in procarray */
+ }
+ if (is_new || !chan->backend_proc->is_tainted) /* If backend is not storing some session context */
+ {
+ Channel* pending = chan->pool->pending_clients;
+ Assert(!chan->backend_is_tainted);
+ if (chan->peer)
+ {
+ chan->peer->peer = NULL;
+ chan->pool->n_idle_clients += 1;
+ chan->pool->proxy->state->n_idle_clients += 1;
+ chan->peer->is_idle = true;
+ }
+ if (pending)
+ {
+ /* Has pending clients: serve one of them */
+ ELOG(LOG, "Backed %d is reassigned to client %p", chan->backend_pid, pending);
+ chan->pool->pending_clients = pending->next;
+ Assert(chan != pending);
+ chan->peer = pending;
+ pending->peer = chan;
+ chan->pool->n_pending_clients -= 1;
+ if (pending->tx_size == 0) /* new client has sent startup packet and we now need to send handshake response */
+ {
+ Assert(chan->handshake_response != NULL); /* backend already sent handshake response */
+ Assert(chan->handshake_response_size < chan->buf_size);
+ memcpy(chan->buf, chan->handshake_response, chan->handshake_response_size);
+ chan->rx_pos = chan->tx_size = chan->handshake_response_size;
+ ELOG(LOG, "Simulate response for startup packet to client %p", pending);
+ chan->backend_is_ready = true;
+ return channel_write(pending, false);
+ }
+ else
+ {
+ ELOG(LOG, "Try to send pending request from client %p to backend %p (pid %d)", pending, chan, chan->backend_pid);
+ Assert(pending->tx_pos == 0 && pending->rx_pos >= pending->tx_size);
+ return channel_write(chan, false); /* Send pending request to backend */
+ }
+ }
+ else /* return backend to the list of idle backends */
+ {
+ ELOG(LOG, "Backed %d is idle", chan->backend_pid);
+ Assert(!chan->client_port);
+ chan->next = chan->pool->idle_backends;
+ chan->pool->idle_backends = chan;
+ chan->pool->n_idle_backends += 1;
+ chan->pool->proxy->state->n_idle_backends += 1;
+ chan->is_idle = true;
+ chan->peer = NULL;
+ }
+ }
+ else if (!chan->backend_is_tainted) /* if it was not marked as tainted before... */
+ {
+ ELOG(LOG, "Backed %d is tainted", chan->backend_pid);
+ chan->backend_is_tainted = true;
+ chan->proxy->state->n_dedicated_backends += 1;
+ }
+ return true;
+}
+
+static size_t
+string_length(char const* str)
+{
+ size_t spaces = 0;
+ char const* p = str;
+ if (p == NULL)
+ return 0;
+ while (*p != '\0')
+ spaces += (*p++ == ' ');
+ return (p - str) + spaces;
+}
+
+static size_t
+string_list_length(List* list)
+{
+ ListCell *cell;
+ size_t length = 0;
+ foreach (cell, list)
+ {
+ length += strlen((char*)lfirst(cell));
+ }
+ return length;
+}
+
+static List*
+string_list_copy(List* orig)
+{
+ List* copy = list_copy(orig);
+ ListCell *cell;
+ foreach (cell, copy)
+ {
+ lfirst(cell) = pstrdup((char*)lfirst(cell));
+ }
+ return copy;
+}
+
+static bool
+string_list_equal(List* a, List* b)
+{
+ const ListCell *ca, *cb;
+ if (list_length(a) != list_length(b))
+ return false;
+ forboth(ca, a, cb, b)
+ if (strcmp(lfirst(ca), lfirst(cb)) != 0)
+ return false;
+ return true;
+}
+
+static char*
+string_append(char* dst, char const* src)
+{
+ while (*src)
+ {
+ if (*src == ' ')
+ *dst++ = '\\';
+ *dst++ = *src++;
+ }
+ return dst;
+}
+
+static bool
+string_equal(char const* a, char const* b)
+{
+ return a == b ? true : a == NULL || b == NULL ? false : strcmp(a, b) == 0;
+}
+
+/**
+ * Parse client's startup packet and assign client to proper connection pool based on dbname/role
+ */
+static bool
+client_connect(Channel* chan, int startup_packet_size)
+{
+ bool found;
+ SessionPoolKey key;
+ char* startup_packet = chan->buf;
+ MemoryContext proxy_ctx;
+
+ Assert(chan->client_port);
+
+ /* parse startup packet in parse_ctx memory context and reset it when it is not needed any more */
+ MemoryContextReset(chan->proxy->parse_ctx);
+ proxy_ctx = MemoryContextSwitchTo(chan->proxy->parse_ctx);
+
+ /* Associate libpq with client's port */
+ MyProcPort = chan->client_port;
+ pq_init();
+
+ if (ParseStartupPacket(chan->client_port, chan->proxy->parse_ctx, startup_packet+4, startup_packet_size-4, false) != STATUS_OK) /* skip packet size */
+ {
+ MyProcPort = NULL;
+ MemoryContextSwitchTo(proxy_ctx);
+ elog(WARNING, "Failed to parse startup packet for client %p", chan);
+ return false;
+ }
+ MyProcPort = NULL;
+ MemoryContextSwitchTo(proxy_ctx);
+ if (am_walsender)
+ {
+ elog(WARNING, "WAL sender should not be connected through proxy");
+ return false;
+ }
+
+ chan->proxy->state->n_ssl_clients += chan->client_port->ssl_in_use;
+ pg_set_noblock(chan->client_port->sock); /* SSL handshake may switch socket to blocking mode */
+ memset(&key, 0, sizeof(key));
+ strlcpy(key.database, chan->client_port->database_name, NAMEDATALEN);
+ if (MultitenantProxy)
+ chan->gucs = psprintf("set local role %s;", chan->client_port->user_name);
+ else
+ strlcpy(key.username, chan->client_port->user_name, NAMEDATALEN);
+
+ ELOG(LOG, "Client %p connects to %s/%s", chan, key.database, key.username);
+
+ chan->pool = (SessionPool*)hash_search(chan->proxy->pools, &key, HASH_ENTER, &found);
+ if (!found)
+ {
+ /* First connection to this role/dbname */
+ chan->proxy->state->n_pools += 1;
+ chan->pool->startup_gucs = NULL;
+ chan->pool->cmdline_options = NULL;
+ memset((char*)chan->pool + sizeof(SessionPoolKey), 0, sizeof(SessionPool) - sizeof(SessionPoolKey));
+ }
+ if (ProxyingGUCs)
+ {
+ ListCell *gucopts = list_head(chan->client_port->guc_options);
+ while (gucopts)
+ {
+ char *name;
+ char *value;
+
+ name = lfirst(gucopts);
+ gucopts = lnext(chan->client_port->guc_options, gucopts);
+
+ value = lfirst(gucopts);
+ gucopts = lnext(chan->client_port->guc_options, gucopts);
+
+ chan->gucs = psprintf("%sset local %s='%s';", chan->gucs ? chan->gucs : "", name, value);
+ }
+ }
+ else
+ {
+ /* Assume that all clients are using the same set of GUCs.
+ * Use then for launching pooler worker backends and report error
+ * if GUCs in startup packets are different.
+ */
+ if (chan->pool->n_launched_backends == 0)
+ {
+ list_free(chan->pool->startup_gucs);
+ if (chan->pool->cmdline_options)
+ pfree(chan->pool->cmdline_options);
+
+ chan->pool->startup_gucs = string_list_copy(chan->client_port->guc_options);
+ if (chan->client_port->cmdline_options)
+ chan->pool->cmdline_options = pstrdup(chan->client_port->cmdline_options);
+ }
+ else
+ {
+ if (!string_list_equal(chan->pool->startup_gucs, chan->client_port->guc_options) ||
+ !string_equal(chan->pool->cmdline_options, chan->client_port->cmdline_options))
+ {
+ elog(LOG, "Ignoring GUCs of client %s",
+ NULLSTR(chan->client_port->application_name));
+ }
+ }
+ }
+ chan->pool->proxy = chan->proxy;
+ chan->pool->n_connected_clients += 1;
+ chan->proxy->n_accepted_connections -= 1;
+ chan->pool->n_idle_clients += 1;
+ chan->pool->proxy->state->n_idle_clients += 1;
+ chan->is_idle = true;
+ return true;
+}
+
+/*
+ * Send error message to the client. This function is called when new backend can not be started
+ * or client is assigned to the backend because of configuration limitations.
+ */
+static void
+report_error_to_client(Channel* chan, char const* error)
+{
+ StringInfoData msgbuf;
+ initStringInfo(&msgbuf);
+ pq_sendbyte(&msgbuf, 'E');
+ pq_sendint32(&msgbuf, 7 + strlen(error));
+ pq_sendbyte(&msgbuf, PG_DIAG_MESSAGE_PRIMARY);
+ pq_sendstring(&msgbuf, error);
+ pq_sendbyte(&msgbuf, '\0');
+ socket_write(chan, msgbuf.data, msgbuf.len);
+ pfree(msgbuf.data);
+}
+
+/*
+ * Attach client to backend. Return true if new backend is attached, false otherwise.
+ */
+static bool
+client_attach(Channel* chan)
+{
+ Channel* idle_backend = chan->pool->idle_backends;
+ chan->is_idle = false;
+ chan->pool->n_idle_clients -= 1;
+ chan->pool->proxy->state->n_idle_clients -= 1;
+ if (idle_backend)
+ {
+ /* has some idle backend */
+ Assert(!idle_backend->backend_is_tainted && !idle_backend->client_port);
+ Assert(chan != idle_backend);
+ chan->peer = idle_backend;
+ idle_backend->peer = chan;
+ chan->pool->idle_backends = idle_backend->next;
+ chan->pool->n_idle_backends -= 1;
+ chan->pool->proxy->state->n_idle_backends -= 1;
+ idle_backend->is_idle = false;
+ if (IdlePoolWorkerTimeout)
+ chan->backend_last_activity = GetCurrentTimestamp();
+ ELOG(LOG, "Attach client %p to backend %p (pid %d)", chan, idle_backend, idle_backend->backend_pid);
+ }
+ else /* all backends are busy */
+ {
+ if (chan->pool->n_launched_backends < chan->proxy->max_backends)
+ {
+ char* error;
+ /* Try to start new backend */
+ idle_backend = backend_start(chan->pool, &error);
+ if (idle_backend != NULL)
+ {
+ ELOG(LOG, "Start new backend %p (pid %d) for client %p",
+ idle_backend, idle_backend->backend_pid, chan);
+ Assert(chan != idle_backend);
+ chan->peer = idle_backend;
+ idle_backend->peer = chan;
+ if (IdlePoolWorkerTimeout)
+ idle_backend->backend_last_activity = GetCurrentTimestamp();
+ return true;
+ }
+ else
+ {
+ if (error)
+ {
+ report_error_to_client(chan, error);
+ free(error);
+ }
+ channel_hangout(chan, "connect");
+ return false;
+ }
+ }
+ /* Postpone handshake until some backend is available */
+ ELOG(LOG, "Client %p is waiting for available backends", chan);
+ chan->next = chan->pool->pending_clients;
+ chan->pool->pending_clients = chan;
+ chan->pool->n_pending_clients += 1;
+ }
+ return false;
+}
+
+/*
+ * Handle communication failure for this channel.
+ * It is not possible to remove channel immediately because it can be triggered by other epoll events.
+ * So link all channels in L1 list for pending delete.
+ */
+static void
+channel_hangout(Channel* chan, char const* op)
+{
+ Channel** ipp;
+ Channel* peer = chan->peer;
+ if (chan->is_disconnected || chan->pool == NULL)
+ return;
+
+ if (chan->client_port) {
+ ELOG(LOG, "Hangout client %p due to %s error: %m", chan, op);
+ for (ipp = &chan->pool->pending_clients; *ipp != NULL; ipp = &(*ipp)->next)
+ {
+ if (*ipp == chan)
+ {
+ *ipp = chan->next;
+ chan->pool->n_pending_clients -= 1;
+ break;
+ }
+ }
+ if (chan->is_idle)
+ {
+ chan->pool->n_idle_clients -= 1;
+ chan->pool->proxy->state->n_idle_clients -= 1;
+ chan->is_idle = false;
+ }
+ }
+ else
+ {
+ ELOG(LOG, "Hangout backend %p (pid %d) due to %s error: %m", chan, chan->backend_pid, op);
+ for (ipp = &chan->pool->idle_backends; *ipp != NULL; ipp = &(*ipp)->next)
+ {
+ if (*ipp == chan)
+ {
+ Assert (chan->is_idle);
+ *ipp = chan->next;
+ chan->pool->n_idle_backends -= 1;
+ chan->pool->proxy->state->n_idle_backends -= 1;
+ chan->is_idle = false;
+ break;
+ }
+ }
+ }
+ if (peer)
+ {
+ peer->peer = NULL;
+ chan->peer = NULL;
+ }
+ chan->backend_is_ready = false;
+
+ if (chan->client_port && peer) /* If it is client connected to backend. */
+ {
+ if (!chan->is_interrupted) /* Client didn't sent 'X' command, so do it for him. */
+ {
+ ELOG(LOG, "Send terminate command to backend %p (pid %d)", peer, peer->backend_pid);
+ peer->is_interrupted = true; /* interrupted flags makes channel_write to send 'X' message */
+ channel_write(peer, false);
+ return;
+ }
+ else if (!peer->is_interrupted)
+ {
+ /* Client already sent 'X' command, so we can safely reschedule backend to some other client session */
+ backend_reschedule(peer, false);
+ }
+ }
+ chan->next = chan->proxy->hangout;
+ chan->proxy->hangout = chan;
+ chan->is_disconnected = true;
+}
+
+/*
+ * Try to write data to the socket.
+ */
+static ssize_t
+socket_write(Channel* chan, char const* buf, size_t size)
+{
+ ssize_t rc;
+#ifdef USE_SSL
+ int waitfor = 0;
+ if (chan->client_port && chan->client_port->ssl_in_use)
+ rc = be_tls_write(chan->client_port, (char*)buf, size, &waitfor);
+ else
+#endif
+ rc = chan->client_port
+ ? secure_raw_write(chan->client_port, buf, size)
+ : send(chan->backend_socket, buf, size, 0);
+ if (rc == 0 || (rc < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)))
+ {
+ channel_hangout(chan, "write");
+ }
+ return rc;
+}
+
+
+/*
+ * Try to send some data to the channel.
+ * Data is located in the peer buffer. Because of using edge-triggered mode we have have to use non-blocking IO
+ * and try to write all available data. Once write is completed we should try to read more data from source socket.
+ * "synchronous" flag is used to avoid infinite recursion or reads-writers.
+ * Returns true if there is nothing to do or operation is successfully completed, false in case of error
+ * or socket buffer is full.
+ */
+static bool
+channel_write(Channel* chan, bool synchronous)
+{
+ Channel* peer = chan->peer;
+ if (!chan->client_port && chan->is_interrupted)
+ {
+ /* Send terminate command to the backend. */
+ char const terminate[] = {'X', 0, 0, 0, 4};
+ if (socket_write(chan, terminate, sizeof(terminate)) <= 0)
+ return false;
+ channel_hangout(chan, "terminate");
+ return true;
+ }
+ if (peer == NULL)
+ return false;
+
+ while (peer->tx_pos < peer->tx_size) /* has something to write */
+ {
+ ssize_t rc = socket_write(chan, peer->buf + peer->tx_pos, peer->tx_size - peer->tx_pos);
+ ELOG(LOG, "%p: write %d tx_pos=%d, tx_size=%d: %m", chan, (int)rc, peer->tx_pos, peer->tx_size);
+ if (rc <= 0)
+ return false;
+ if (chan->client_port)
+ chan->proxy->state->tx_bytes += rc;
+ else
+ chan->proxy->state->rx_bytes += rc;
+ if (rc > 0 && chan->edge_triggered)
+ {
+ /* resume accepting all events */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_WRITEABLE|WL_SOCKET_READABLE|WL_SOCKET_EDGE, NULL);
+ chan->edge_triggered = false;
+ }
+ peer->tx_pos += rc;
+ }
+ if (peer->tx_size != 0)
+ {
+ /* Copy rest of received data to the beginning of the buffer */
+ chan->backend_is_ready = false;
+ Assert(peer->rx_pos >= peer->tx_size);
+ memmove(peer->buf, peer->buf + peer->tx_size, peer->rx_pos - peer->tx_size);
+ peer->rx_pos -= peer->tx_size;
+ peer->tx_pos = peer->tx_size = 0;
+ if (peer->backend_is_ready) {
+ Assert(peer->rx_pos == 0);
+ backend_reschedule(peer, false);
+ return true;
+ }
+ }
+ return synchronous || channel_read(peer); /* write is not invoked from read */
+}
+
+static bool
+is_transaction_start(char* stmt)
+{
+ return pg_strncasecmp(stmt, "begin", 5) == 0 || pg_strncasecmp(stmt, "start", 5) == 0;
+}
+
+static bool
+is_transactional_statement(char* stmt)
+{
+ static char const* const non_tx_stmts[] = {
+ "create tablespace",
+ "create database",
+ "cluster",
+ "drop",
+ "discard",
+ "reindex",
+ "rollback",
+ "vacuum",
+ NULL
+ };
+ int i;
+ for (i = 0; non_tx_stmts[i]; i++)
+ {
+ if (pg_strncasecmp(stmt, non_tx_stmts[i], strlen(non_tx_stmts[i])) == 0)
+ return false;
+ }
+ return true;
+}
+
+/*
+ * Try to read more data from the channel and send it to the peer.
+ */
+static bool
+channel_read(Channel* chan)
+{
+ int msg_start;
+ while (chan->tx_size == 0) /* there is no pending write op */
+ {
+ ssize_t rc;
+#ifdef USE_SSL
+ int waitfor = 0;
+ if (chan->client_port && chan->client_port->ssl_in_use)
+ rc = be_tls_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, &waitfor);
+ else
+#endif
+ rc = chan->client_port
+ ? secure_raw_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos)
+ : recv(chan->backend_socket, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, 0);
+
+ ELOG(LOG, "%p: read %d: %m", chan, (int)rc);
+ if (rc <= 0)
+ {
+ if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
+ channel_hangout(chan, "read");
+ return false; /* wait for more data */
+ }
+ else if (chan->edge_triggered)
+ {
+ /* resume accepting all events */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL);
+ chan->edge_triggered = false;
+ }
+ chan->rx_pos += rc;
+ msg_start = 0;
+
+ /* Loop through all received messages */
+ while (chan->rx_pos - msg_start >= 5) /* has message code + length */
+ {
+ int msg_len;
+ uint32 new_msg_len;
+ bool handshake = false;
+ if (chan->pool == NULL) /* process startup packet */
+ {
+ Assert(msg_start == 0);
+ memcpy(&msg_len, chan->buf + msg_start, sizeof(msg_len));
+ msg_len = ntohl(msg_len);
+ handshake = true;
+ }
+ else
+ {
+ ELOG(LOG, "%p receive message %c", chan, chan->buf[msg_start]);
+ memcpy(&msg_len, chan->buf + msg_start + 1, sizeof(msg_len));
+ msg_len = ntohl(msg_len) + 1;
+ }
+ if (msg_start + msg_len > chan->buf_size)
+ {
+ /* Reallocate buffer to fit complete message body */
+ chan->buf_size = msg_start + msg_len;
+ chan->buf = repalloc(chan->buf, chan->buf_size);
+ }
+ if (chan->rx_pos - msg_start >= msg_len) /* Message is completely fetched */
+ {
+ int response_size = msg_start + msg_len;
+ if (chan->pool == NULL) /* receive startup packet */
+ {
+ Assert(chan->client_port);
+ if (!client_connect(chan, msg_len))
+ {
+ /* Some trouble with processing startup packet */
+ chan->is_disconnected = true;
+ channel_remove(chan);
+ return false;
+ }
+ }
+ else if (!chan->client_port) /* Message from backend */
+ {
+ if (chan->buf[msg_start] == 'Z' /* Ready for query */
+ && chan->buf[msg_start+5] == 'I') /* Transaction block status is idle */
+ {
+ Assert(chan->rx_pos - msg_start == msg_len); /* Should be last message */
+ chan->backend_is_ready = true; /* Backend is ready for query */
+ chan->proxy->state->n_transactions += 1;
+ if (chan->peer)
+ chan->peer->in_transaction = false;
+ }
+ else if (chan->buf[msg_start] == 'E') /* Error */
+ {
+ if (chan->peer && chan->peer->prev_gucs)
+ {
+ /* Undo GUC assignment */
+ pfree(chan->peer->gucs);
+ chan->peer->gucs = chan->peer->prev_gucs;
+ chan->peer->prev_gucs = NULL;
+ }
+ }
+ }
+ else if (chan->client_port) /* Message from client */
+ {
+ if (chan->buf[msg_start] == 'X') /* Terminate message */
+ {
+ chan->is_interrupted = true;
+ if (chan->peer == NULL || !chan->peer->backend_is_tainted)
+ {
+ /* Skip terminate message to idle and non-tainted backends */
+ channel_hangout(chan, "terminate");
+ return false;
+ }
+ }
+ else if ((ProxyingGUCs || MultitenantProxy)
+ && chan->buf[msg_start] == 'Q' && !chan->in_transaction)
+ {
+ char* stmt = &chan->buf[msg_start+5];
+ if (chan->prev_gucs)
+ {
+ pfree(chan->prev_gucs);
+ chan->prev_gucs = NULL;
+ }
+ if (ProxyingGUCs
+ && ((pg_strncasecmp(stmt, "set", 3) == 0
+ && pg_strncasecmp(stmt+3, " local", 6) != 0)
+ || pg_strncasecmp(stmt, "reset", 5) == 0))
+ {
+ char* new_msg;
+ chan->prev_gucs = chan->gucs ? chan->gucs : pstrdup("");
+ if (pg_strncasecmp(stmt, "reset", 5) == 0)
+ {
+ char* semi = strchr(stmt+5, ';');
+ if (semi)
+ *semi = '\0';
+ chan->gucs = psprintf("%sset local%s=default;",
+ chan->prev_gucs, stmt+5);
+ }
+ else
+ {
+ char* param = stmt + 3;
+ if (pg_strncasecmp(param, " session", 8) == 0)
+ param += 8;
+ chan->gucs = psprintf("%sset local%s%c", chan->prev_gucs, param,
+ chan->buf[chan->rx_pos-2] == ';' ? ' ' : ';');
+ }
+ new_msg = chan->gucs + strlen(chan->prev_gucs);
+ Assert(msg_start + strlen(new_msg)*2 + 6 < chan->buf_size);
+ /*
+ * We need to send SET command to check if it is correct.
+ * To avoid "SET LOCAL can only be used in transaction blocks"
+ * error we need to construct block. Let's just double the command.
+ */
+ msg_len = sprintf(stmt, "%s%s", new_msg, new_msg) + 6;
+ new_msg_len = pg_hton32(msg_len - 1);
+ memcpy(&chan->buf[msg_start+1], &new_msg_len, sizeof(new_msg_len));
+ chan->rx_pos = msg_start + msg_len;
+ }
+ else if (chan->gucs && is_transactional_statement(stmt))
+ {
+ size_t gucs_len = strlen(chan->gucs);
+ if (chan->rx_pos + gucs_len > chan->buf_size)
+ {
+ /* Reallocate buffer to fit concatenated GUCs */
+ chan->buf_size = chan->rx_pos + gucs_len;
+ chan->buf = repalloc(chan->buf, chan->buf_size);
+ }
+ if (is_transaction_start(stmt))
+ {
+ /* Append GUCs after BEGIN command to include them in transaction body */
+ memcpy(&chan->buf[chan->rx_pos-1], chan->gucs, gucs_len+1);
+ chan->in_transaction = true;
+ }
+ else
+ {
+ /* Prepend standalone command with GUCs */
+ memmove(stmt + gucs_len, stmt, msg_len);
+ memcpy(stmt, chan->gucs, gucs_len);
+ }
+ chan->rx_pos += gucs_len;
+ msg_len += gucs_len;
+ new_msg_len = pg_hton32(msg_len - 1);
+ memcpy(&chan->buf[msg_start+1], &new_msg_len, sizeof(new_msg_len));
+ }
+ else if (is_transaction_start(stmt))
+ chan->in_transaction = true;
+ }
+ }
+ if (chan->peer == NULL) /* client is not yet connected to backend */
+ {
+ if (!chan->client_port)
+ {
+ /* We are not expecting messages from idle backend. Assume that it some error or shutdown. */
+ channel_hangout(chan, "idle");
+ return false;
+ }
+ client_attach(chan);
+ if (handshake) /* Send handshake response to the client */
+ {
+ /* If we attach new client to the existed backend, then we need to send handshake response to the client */
+ Channel* backend = chan->peer;
+ Assert(chan->rx_pos == msg_len && msg_start == 0);
+ chan->rx_pos = 0; /* Skip startup packet */
+ if (backend != NULL) /* Backend was assigned */
+ {
+ Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */
+ Assert(backend->handshake_response_size < backend->buf_size);
+ memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size);
+ backend->rx_pos = backend->tx_size = backend->handshake_response_size;
+ backend->backend_is_ready = true;
+ return channel_write(chan, false);
+ }
+ else
+ {
+ /* Handshake response will be send to client later when backend is assigned */
+ return false;
+ }
+ }
+ else if (chan->peer == NULL) /* Backend was not assigned */
+ {
+ chan->tx_size = response_size; /* query will be send later once backend is assigned */
+ return false;
+ }
+ }
+ msg_start += msg_len;
+ }
+ else break; /* Incomplete message. */
+ }
+ if (msg_start != 0)
+ {
+ /* Has some complete messages to send to peer */
+ Assert(chan->tx_pos == 0);
+ Assert(chan->rx_pos >= msg_start);
+ chan->tx_size = msg_start;
+ if (!channel_write(chan->peer, true))
+ return false;
+ }
+ /* If backend is out of transaction, then reschedule it */
+ if (chan->backend_is_ready)
+ return backend_reschedule(chan, false);
+
+ /* Do not try to read more data if edge-triggered mode is not supported */
+ if (!WaitEventUseEpoll)
+ break;
+ }
+ return true;
+}
+
+/*
+ * Create new channel.
+ */
+static Channel*
+channel_create(Proxy* proxy)
+{
+ Channel* chan = (Channel*)palloc0(sizeof(Channel));
+ chan->magic = ACTIVE_CHANNEL_MAGIC;
+ chan->proxy = proxy;
+ chan->buf = palloc(INIT_BUF_SIZE);
+ chan->buf_size = INIT_BUF_SIZE;
+ chan->tx_pos = chan->rx_pos = chan->tx_size = 0;
+ return chan;
+}
+
+/*
+ * Register new channel in wait event set.
+ */
+static bool
+channel_register(Proxy* proxy, Channel* chan)
+{
+ pgsocket sock = chan->client_port ? chan->client_port->sock : chan->backend_socket;
+ /* Using edge epoll mode requires non-blocking sockets */
+ pg_set_noblock(sock);
+ chan->event_pos =
+ AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE,
+ sock, NULL, chan);
+ if (chan->event_pos < 0)
+ {
+ elog(WARNING, "PROXY: Failed to add new client - too much sessions: %d clients, %d backends. "
+ "Try to increase 'max_sessions' configuration parameter.",
+ proxy->state->n_clients, proxy->state->n_backends);
+ return false;
+ }
+ return true;
+}
+
+/*
+ * Start new backend for particular pool associated with dbname/role combination.
+ * Backend is forked using BackendStartup function.
+ */
+static Channel*
+backend_start(SessionPool* pool, char** error)
+{
+ Channel* chan;
+ char postmaster_port[8];
+ char* options = (char*)palloc(string_length(pool->cmdline_options) + string_list_length(pool->startup_gucs) + list_length(pool->startup_gucs)/2*5 + 1);
+ char const* keywords[] = {"port","dbname","user","sslmode","application_name","options",NULL};
+ char const* values[] = {postmaster_port,pool->key.database,pool->key.username,"disable","pool_worker",options,NULL};
+ PGconn* conn;
+ char* msg;
+ int int32_buf;
+ int msg_len;
+ static bool libpqconn_loaded;
+ ListCell *gucopts;
+ char* dst = options;
+
+ if (!libpqconn_loaded)
+ {
+ /* We need libpq library to be able to establish connections to pool workers.
+ * This library can not be linked statically, so load it on demand. */
+ load_file("libpqconn", false);
+ libpqconn_loaded = true;
+ }
+ pg_itoa(PostPortNumber, postmaster_port);
+
+ gucopts = list_head(pool->startup_gucs);
+ if (pool->cmdline_options)
+ dst += sprintf(dst, "%s", pool->cmdline_options);
+ while (gucopts)
+ {
+ char *name;
+ char *value;
+
+ name = lfirst(gucopts);
+ gucopts = lnext(pool->startup_gucs, gucopts);
+
+ value = lfirst(gucopts);
+ gucopts = lnext(pool->startup_gucs, gucopts);
+
+ if (strcmp(name, "application_name") != 0)
+ {
+ dst += sprintf(dst, " -c %s=", name);
+ dst = string_append(dst, value);
+ }
+ }
+ *dst = '\0';
+ conn = LibpqConnectdbParams(keywords, values, error);
+ pfree(options);
+ if (!conn)
+ return NULL;
+
+ chan = channel_create(pool->proxy);
+ chan->pool = pool;
+ chan->backend_socket = conn->sock;
+ /* Using edge epoll mode requires non-blocking sockets */
+ pg_set_noblock(conn->sock);
+
+ /* Save handshake response */
+ chan->handshake_response_size = conn->inEnd;
+ chan->handshake_response = palloc(chan->handshake_response_size);
+ memcpy(chan->handshake_response, conn->inBuffer, chan->handshake_response_size);
+
+ /* Extract backend pid */
+ msg = chan->handshake_response;
+ while (*msg != 'K') /* Scan handshake response until we reach PID message */
+ {
+ memcpy(&int32_buf, ++msg, sizeof(int32_buf));
+ msg_len = ntohl(int32_buf);
+ msg += msg_len;
+ Assert(msg < chan->handshake_response + chan->handshake_response_size);
+ }
+ memcpy(&int32_buf, msg+5, sizeof(int32_buf));
+ chan->backend_pid = ntohl(int32_buf);
+
+ if (channel_register(pool->proxy, chan))
+ {
+ pool->proxy->state->n_backends += 1;
+ pool->n_launched_backends += 1;
+ }
+ else
+ {
+ *error = strdup("Too much sessios: try to increase 'max_sessions' configuration parameter");
+ /* Too much sessions, error report was already logged */
+ closesocket(chan->backend_socket);
+ chan->magic = REMOVED_CHANNEL_MAGIC;
+ pfree(chan->buf);
+ pfree(chan);
+ chan = NULL;
+ }
+ return chan;
+}
+
+/*
+ * Add new client accepted by postmaster. This client will be assigned to concrete session pool
+ * when it's startup packet is received.
+ */
+static void
+proxy_add_client(Proxy* proxy, Port* port)
+{
+ Channel* chan = channel_create(proxy);
+ chan->client_port = port;
+ chan->backend_socket = PGINVALID_SOCKET;
+ if (channel_register(proxy, chan))
+ {
+ ELOG(LOG, "Add new client %p", chan);
+ proxy->n_accepted_connections += 1;
+ proxy->state->n_clients += 1;
+ }
+ else
+ {
+ report_error_to_client(chan, "Too much sessions. Try to increase 'max_sessions' configuration parameter");
+ /* Too much sessions, error report was already logged */
+ closesocket(port->sock);
+#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+ pfree(port->gss);
+#endif
+ chan->magic = REMOVED_CHANNEL_MAGIC;
+ pfree(port);
+ pfree(chan->buf);
+ pfree(chan);
+ }
+}
+
+/*
+ * Perform delayed deletion of channel
+ */
+static void
+channel_remove(Channel* chan)
+{
+ Assert(chan->is_disconnected); /* should be marked as disconnected by channel_hangout */
+ DeleteWaitEventFromSet(chan->proxy->wait_events, chan->event_pos);
+ if (chan->client_port)
+ {
+ if (chan->pool)
+ chan->pool->n_connected_clients -= 1;
+ else
+ chan->proxy->n_accepted_connections -= 1;
+ chan->proxy->state->n_clients -= 1;
+ chan->proxy->state->n_ssl_clients -= chan->client_port->ssl_in_use;
+ closesocket(chan->client_port->sock);
+ pfree(chan->client_port);
+ if (chan->gucs)
+ pfree(chan->gucs);
+ if (chan->prev_gucs)
+ pfree(chan->prev_gucs);
+ }
+ else
+ {
+ chan->proxy->state->n_backends -= 1;
+ chan->proxy->state->n_dedicated_backends -= chan->backend_is_tainted;
+ chan->pool->n_launched_backends -= 1;
+ closesocket(chan->backend_socket);
+ pfree(chan->handshake_response);
+
+ if (chan->pool->pending_clients)
+ {
+ char* error;
+ /* Try to start new backend instead of terminated */
+ Channel* new_backend = backend_start(chan->pool, &error);
+ if (new_backend != NULL)
+ {
+ ELOG(LOG, "Spawn new backend %p instead of terminated %p", new_backend, chan);
+ backend_reschedule(new_backend, true);
+ }
+ else
+ free(error);
+ }
+ }
+ chan->magic = REMOVED_CHANNEL_MAGIC;
+ pfree(chan->buf);
+ pfree(chan);
+}
+
+
+
+/*
+ * Create new proxy.
+ */
+static Proxy*
+proxy_create(pgsocket postmaster_socket, ConnectionProxyState* state, int max_backends)
+{
+ HASHCTL ctl;
+ Proxy* proxy;
+ MemoryContext proxy_memctx = AllocSetContextCreate(TopMemoryContext,
+ "Proxy",
+ ALLOCSET_DEFAULT_SIZES);
+ MemoryContextSwitchTo(proxy_memctx);
+ proxy = palloc0(sizeof(Proxy));
+ proxy->parse_ctx = AllocSetContextCreate(proxy_memctx,
+ "Startup packet parsing context",
+ ALLOCSET_DEFAULT_SIZES);
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(SessionPoolKey);
+ ctl.entrysize = sizeof(SessionPool);
+ ctl.hcxt = proxy_memctx;
+ proxy->pools = hash_create("Pool by database and user", DB_HASH_SIZE,
+ &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* We need events both for clients and backends so multiply MaxConnection by two */
+ proxy->wait_events = CreateWaitEventSet(TopMemoryContext, MaxSessions*2);
+ AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE,
+ postmaster_socket, NULL, NULL);
+ proxy->max_backends = max_backends;
+ proxy->state = state;
+ return proxy;
+}
+
+/*
+ * Main proxy loop
+ */
+static void
+proxy_loop(Proxy* proxy)
+{
+ int i, n_ready;
+ WaitEvent ready[MAX_READY_EVENTS];
+ Channel *chan, *next;
+
+ /* Main loop */
+ while (!proxy->shutdown)
+ {
+ /* Use timeout to allow normal proxy shutdown */
+ int wait_timeout = IdlePoolWorkerTimeout ? IdlePoolWorkerTimeout : PROXY_WAIT_TIMEOUT;
+ n_ready = WaitEventSetWait(proxy->wait_events, wait_timeout, ready, MAX_READY_EVENTS, PG_WAIT_CLIENT);
+ for (i = 0; i < n_ready; i++) {
+ chan = (Channel*)ready[i].user_data;
+ if (chan == NULL) /* new connection from postmaster */
+ {
+ Port* port = (Port*)palloc0(sizeof(Port));
+ port->sock = pg_recv_sock(ready[i].fd);
+ if (port->sock == PGINVALID_SOCKET)
+ {
+ elog(WARNING, "Failed to receive session socket: %m");
+ pfree(port);
+ }
+ else
+ {
+#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+ port->gss = (pg_gssinfo *)palloc0(sizeof(pg_gssinfo));
+ if (!port->gss)
+ ereport(FATAL,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+#endif
+ proxy_add_client(proxy, port);
+ }
+ }
+ /*
+ * epoll may return event for already closed session if
+ * socket is still openned. From epoll documentation: Q6
+ * Will closing a file descriptor cause it to be removed
+ * from all epoll sets automatically?
+ *
+ * A6 Yes, but be aware of the following point. A file
+ * descriptor is a reference to an open file description
+ * (see open(2)). Whenever a descriptor is duplicated via
+ * dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new
+ * file descriptor referring to the same open file
+ * description is created. An open file description
+ * continues to exist until all file descriptors
+ * referring to it have been closed. A file descriptor is
+ * removed from an epoll set only after all the file
+ * descriptors referring to the underlying open file
+ * description have been closed (or before if the
+ * descriptor is explicitly removed using epoll_ctl(2)
+ * EPOLL_CTL_DEL). This means that even after a file
+ * descriptor that is part of an epoll set has been
+ * closed, events may be reported for that file
+ * descriptor if other file descriptors referring to
+ * the same underlying file description remain open.
+ *
+ * Using this check for valid magic field we try to ignore
+ * such events.
+ */
+ else if (chan->magic == ACTIVE_CHANNEL_MAGIC)
+ {
+ if (ready[i].events & WL_SOCKET_WRITEABLE) {
+ ELOG(LOG, "Channel %p is writable", chan);
+ channel_write(chan, false);
+ if (chan->magic == ACTIVE_CHANNEL_MAGIC && (chan->peer == NULL || chan->peer->tx_size == 0)) /* nothing to write */
+ {
+ /* At systems not supporting epoll edge triggering (Win32, FreeBSD, MacOS), we need to disable writable event to avoid busy loop */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE | WL_SOCKET_EDGE, NULL);
+ chan->edge_triggered = true;
+ }
+ }
+ if (ready[i].events & WL_SOCKET_READABLE) {
+ ELOG(LOG, "Channel %p is readable", chan);
+ channel_read(chan);
+ if (chan->magic == ACTIVE_CHANNEL_MAGIC && chan->tx_size != 0) /* pending write: read is not prohibited */
+ {
+ /* At systems not supporting epoll edge triggering (Win32, FreeBSD, MacOS), we need to disable readable event to avoid busy loop */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_WRITEABLE | WL_SOCKET_EDGE, NULL);
+ chan->edge_triggered = true;
+ }
+ }
+ }
+ }
+ if (IdlePoolWorkerTimeout)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout_usec = IdlePoolWorkerTimeout*1000;
+ if (proxy->last_idle_timeout_check + timeout_usec < now)
+ {
+ HASH_SEQ_STATUS seq;
+ struct SessionPool* pool;
+ proxy->last_idle_timeout_check = now;
+ hash_seq_init(&seq, proxy->pools);
+ while ((pool = hash_seq_search(&seq)) != NULL)
+ {
+ for (chan = pool->idle_backends; chan != NULL; chan = chan->next)
+ {
+ if (chan->backend_last_activity + timeout_usec < now)
+ {
+ chan->is_interrupted = true; /* interrupted flags makes channel_write to send 'X' message */
+ channel_write(chan, false);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Delayed deallocation of disconnected channels.
+ * We can not delete channels immediately because of presence of peer events.
+ */
+ for (chan = proxy->hangout; chan != NULL; chan = next)
+ {
+ next = chan->next;
+ channel_remove(chan);
+ }
+ proxy->hangout = NULL;
+ }
+}
+
+/*
+ * Handle normal shutdown of Postgres instance
+ */
+static void
+proxy_handle_sigterm(SIGNAL_ARGS)
+{
+ if (proxy)
+ proxy->shutdown = true;
+}
+
+#ifdef EXEC_BACKEND
+static pid_t
+proxy_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkproxy";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+#endif
+
+NON_EXEC_STATIC void
+ConnectionProxyMain(int argc, char *argv[])
+{
+ sigjmp_buf local_sigjmp_buf;
+
+ /* Identify myself via ps */
+ init_ps_display("connection proxy", "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ pqsignal(SIGTERM, proxy_handle_sigterm);
+ pqsignal(SIGQUIT, quickdie);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
+
+ /* Early initialization */
+ BaseInit();
+
+ /*
+ * Create a per-backend PGPROC struct in shared memory, except in the
+ * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+ * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+ * had to do some stuff with LWLocks).
+ */
+#ifndef EXEC_BACKEND
+ InitProcess();
+#endif
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * See notes in postgres.c about the design of this coding.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /*
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
+ */
+ proc_exit(0);
+ }
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ PG_SETMASK(&UnBlockSig);
+
+ proxy = proxy_create(MyProxySocket, &ProxyState[MyProxyId], SessionPoolSize);
+ proxy_loop(proxy);
+
+ proc_exit(0);
+}
+
+/*
+ * Function for launching proxy by postmaster.
+ * This "boilerplate" code is taken from another auxiliary workers.
+ * In future it may be replaced with background worker.
+ * The main problem with background worker is how to pass socket to it and obtains its PID.
+ */
+int
+ConnectionProxyStart()
+{
+ pid_t worker_pid;
+
+#ifdef EXEC_BACKEND
+ switch ((worker_pid = proxy_forkexec()))
+#else
+ switch ((worker_pid = fork_process()))
+#endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork proxy worker process: %m")));
+ return 0;
+
+#ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ ConnectionProxyMain(0, NULL);
+ break;
+#endif
+ default:
+ elog(LOG, "Start proxy process %d", (int) worker_pid);
+ return (int) worker_pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+/*
+ * We need some place in shared memory to provide information about proxies state.
+ */
+int ConnectionProxyShmemSize(void)
+{
+ return ConnectionProxiesNumber*sizeof(ConnectionProxyState);
+}
+
+void ConnectionProxyShmemInit(void)
+{
+ bool found;
+ ProxyState = (ConnectionProxyState*)ShmemInitStruct("connection proxy contexts",
+ ConnectionProxyShmemSize(), &found);
+ if (!found)
+ memset(ProxyState, 0, ConnectionProxyShmemSize());
+}
+
+PG_FUNCTION_INFO_V1(pg_pooler_state);
+
+typedef struct
+{
+ int proxy_id;
+ TupleDesc ret_desc;
+} PoolerStateContext;
+
+/**
+ * Return information about proxies state.
+ * This set-returning functions returns the following columns:
+ *
+ * pid - proxy process identifier
+ * n_clients - number of clients connected to proxy
+ * n_ssl_clients - number of clients using SSL protocol
+ * n_pools - number of pools (role/dbname combinations) maintained by proxy
+ * n_backends - total number of backends spawned by this proxy (including tainted)
+ * n_dedicated_backends - number of tainted backend
+ * tx_bytes - amount of data sent from backends to clients
+ * rx_bytes - amount of data sent from client to backends
+ * n_transactions - number of transaction proceeded by all backends of this proxy
+ */
+Datum pg_pooler_state(PG_FUNCTION_ARGS)
+{
+ FuncCallContext* srf_ctx;
+ MemoryContext old_context;
+ PoolerStateContext* ps_ctx;
+ HeapTuple tuple;
+ Datum values[11];
+ bool nulls[11];
+ int id;
+ int i;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ srf_ctx = SRF_FIRSTCALL_INIT();
+ old_context = MemoryContextSwitchTo(srf_ctx->multi_call_memory_ctx);
+ ps_ctx = (PoolerStateContext*)palloc(sizeof(PoolerStateContext));
+ get_call_result_type(fcinfo, NULL, &ps_ctx->ret_desc);
+ ps_ctx->proxy_id = 0;
+ srf_ctx->user_fctx = ps_ctx;
+ MemoryContextSwitchTo(old_context);
+ }
+ srf_ctx = SRF_PERCALL_SETUP();
+ ps_ctx = srf_ctx->user_fctx;
+ id = ps_ctx->proxy_id;
+ if (id == ConnectionProxiesNumber)
+ SRF_RETURN_DONE(srf_ctx);
+
+ values[0] = Int32GetDatum(ProxyState[id].pid);
+ values[1] = Int32GetDatum(ProxyState[id].n_clients);
+ values[2] = Int32GetDatum(ProxyState[id].n_ssl_clients);
+ values[3] = Int32GetDatum(ProxyState[id].n_pools);
+ values[4] = Int32GetDatum(ProxyState[id].n_backends);
+ values[5] = Int32GetDatum(ProxyState[id].n_dedicated_backends);
+ values[6] = Int32GetDatum(ProxyState[id].n_idle_backends);
+ values[7] = Int32GetDatum(ProxyState[id].n_idle_clients);
+ values[8] = Int64GetDatum(ProxyState[id].tx_bytes);
+ values[9] = Int64GetDatum(ProxyState[id].rx_bytes);
+ values[10] = Int64GetDatum(ProxyState[id].n_transactions);
+
+ for (i = 0; i < 11; i++)
+ nulls[i] = false;
+
+ ps_ctx->proxy_id += 1;
+ tuple = heap_form_tuple(ps_ctx->ret_desc, values, nulls);
+ SRF_RETURN_NEXT(srf_ctx, HeapTupleGetDatum(tuple));
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index d7d7335..6d32d8f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -28,6 +28,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(int port)
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
+ size = add_size(size, ConnectionProxyShmemSize());
/* freeze the addin request size and include it */
addin_request_allowed = false;
@@ -255,6 +257,7 @@ CreateSharedMemoryAndSemaphores(int port)
WalSndShmemInit();
WalRcvShmemInit();
ApplyLauncherShmemInit();
+ ConnectionProxyShmemInit();
/*
* Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbc..287fb19 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -72,11 +72,29 @@
#error "no wait set implementation available"
#endif
+#if defined(WAIT_USE_EPOLL)
+bool WaitEventUseEpoll = true;
+#else
+bool WaitEventUseEpoll = false;
+#endif
+
+/*
+ * Connection pooler needs to delete events from event set.
+ * As far as we have too preserve positions of all other events,
+ * we can not move events. So we have to maintain list of free events.
+ * But poll/WaitForMultipleObjects manipulates with array of listened events.
+ * That is why elements in pollfds and handle arrays should be stored without holes
+ * and we need to maintain mapping between them and WaitEventSet events.
+ * This mapping is stored in "permutation" array. Also we need backward mapping
+ * (from event to descriptors array) which is implemented using "index" field of WaitEvent.
+ */
+
/* typedef in latch.h */
struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1. */
/*
* Array, of nevents_space length, storing the definition of events this
@@ -84,6 +102,8 @@ struct WaitEventSet
*/
WaitEvent *events;
+ int *permutation; /* indexes of used events (see comment above) */
+
/*
* If WL_LATCH_SET is specified in any wait event, latch is a pointer to
* said latch, and latch_pos the offset in the ->events array. This is
@@ -137,9 +157,9 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
-static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
#endif
static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -553,6 +573,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
*/
sz += MAXALIGN(sizeof(WaitEventSet));
sz += MAXALIGN(sizeof(WaitEvent) * nevents);
+ sz += MAXALIGN(sizeof(int) * nevents);
#if defined(WAIT_USE_EPOLL)
sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
@@ -571,20 +592,21 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->events = (WaitEvent *) data;
data += MAXALIGN(sizeof(WaitEvent) * nevents);
+ set->permutation = (int *) data;
+ data += MAXALIGN(sizeof(int) * nevents);
+
#if defined(WAIT_USE_EPOLL)
set->epoll_ret_events = (struct epoll_event *) data;
- data += MAXALIGN(sizeof(struct epoll_event) * nevents);
#elif defined(WAIT_USE_POLL)
set->pollfds = (struct pollfd *) data;
- data += MAXALIGN(sizeof(struct pollfd) * nevents);
#elif defined(WAIT_USE_WIN32)
- set->handles = (HANDLE) data;
- data += MAXALIGN(sizeof(HANDLE) * nevents);
+ set->handles = (HANDLE*) data;
#endif
set->latch = NULL;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -632,12 +654,11 @@ FreeWaitEventSet(WaitEventSet *set)
#if defined(WAIT_USE_EPOLL)
close(set->epoll_fd);
#elif defined(WAIT_USE_WIN32)
- WaitEvent *cur_event;
+ int i;
- for (cur_event = set->events;
- cur_event < (set->events + set->nevents);
- cur_event++)
+ for (i = 0; i < set->nevents; i++)
{
+ WaitEvent* cur_event = &set->events[set->permutation[i]];
if (cur_event->events & WL_LATCH_SET)
{
/* uses the latch's HANDLE */
@@ -650,7 +671,7 @@ FreeWaitEventSet(WaitEventSet *set)
{
/* Clean up the event object we created for the socket */
WSAEventSelect(cur_event->fd, NULL, 0);
- WSACloseEvent(set->handles[cur_event->pos + 1]);
+ WSACloseEvent(set->handles[cur_event->index + 1]);
}
}
#endif
@@ -691,9 +712,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
- Assert(set->nevents < set->nevents_space);
+ if (set->nevents == set->nevents_space)
+ return -1;
if (events == WL_EXIT_ON_PM_DEATH)
{
@@ -720,8 +743,20 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->permutation[set->nevents] = event->pos;
+ event->index = set->nevents++;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -748,15 +783,41 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
return event->pos;
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos)
+{
+ WaitEvent *event = &set->events[event_pos];
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#elif defined(WAIT_USE_WIN32)
+ WaitEventAdjustWin32(set, event, true);
+#endif
+ if (--set->nevents != 0)
+ {
+ set->permutation[event->index] = set->permutation[set->nevents];
+ set->events[set->permutation[set->nevents]].index = event->index;
+ }
+ event->fd = PGINVALID_SOCKET;
+ event->events = 0;
+ event->index = -1;
+ event->pos = set->free_events;
+ set->free_events = event_pos;
+}
+
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -767,10 +828,16 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
{
WaitEvent *event;
- Assert(pos < set->nevents);
+ Assert(pos < set->nevents_space);
event = &set->events[pos];
+#if defined(WAIT_USE_EPOLL)
+ /* ModifyWaitEvent is used to emulate epoll EPOLLET (edge-triggered) flag */
+ if (events & WL_SOCKET_EDGE)
+ return;
+#endif
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -804,9 +871,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
}
@@ -844,6 +911,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
epoll_ev.events |= EPOLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
epoll_ev.events |= EPOLLOUT;
+ if (event->events & WL_SOCKET_EDGE)
+ epoll_ev.events |= EPOLLET;
}
/*
@@ -852,11 +921,10 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
* requiring that, and actually it makes the code simpler...
*/
rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
-
if (rc < 0)
ereport(ERROR,
(errcode_for_socket_access(),
- /* translator: %s is a syscall name, such as "poll()" */
+ /* translator: %s is a syscall name, such as "poll()" */
errmsg("%s failed: %m",
"epoll_ctl()")));
}
@@ -864,11 +932,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ struct pollfd *pollfd = &set->pollfds[event->index];
+
+ if (remove)
+ {
+ *pollfd = set->pollfds[set->nevents - 1]; /* nevents is not decremented yet */
+ return;
+ }
- pollfd->revents = 0;
pollfd->fd = event->fd;
/* prepare pollfd entry once */
@@ -897,9 +970,21 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
#if defined(WAIT_USE_WIN32)
static void
-WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
{
- HANDLE *handle = &set->handles[event->pos + 1];
+ HANDLE *handle = &set->handles[event->index + 1];
+
+ if (remove)
+ {
+ Assert(event->fd != PGINVALID_SOCKET);
+
+ if (*handle != WSA_INVALID_EVENT)
+ WSACloseEvent(*handle);
+
+ *handle = set->handles[set->nevents]; /* nevents is not decremented yet but we need to add 1 to the index */
+ set->handles[set->nevents] = WSA_INVALID_EVENT;
+ return;
+ }
if (event->events == WL_LATCH_SET)
{
@@ -1200,11 +1285,12 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
{
int returned_events = 0;
int rc;
- WaitEvent *cur_event;
- struct pollfd *cur_pollfd;
+ int i;
+ struct pollfd *cur_pollfd = set->pollfds;
+ WaitEvent* cur_event;
/* Sleep */
- rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
+ rc = poll(cur_pollfd, set->nevents, (int) cur_timeout);
/* Check return code */
if (rc < 0)
@@ -1227,15 +1313,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
return -1;
}
- for (cur_event = set->events, cur_pollfd = set->pollfds;
- cur_event < (set->events + set->nevents) &&
- returned_events < nevents;
- cur_event++, cur_pollfd++)
+ for (i = 0; i < set->nevents && returned_events < nevents; i++, cur_pollfd++)
{
/* no activity on this FD, skip */
if (cur_pollfd->revents == 0)
continue;
+ cur_event = &set->events[set->permutation[i]];
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
occurred_events->events = 0;
@@ -1326,17 +1410,25 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
WaitEvent *occurred_events, int nevents)
{
int returned_events = 0;
+ int i;
DWORD rc;
- WaitEvent *cur_event;
+ WaitEvent* cur_event;
/* Reset any wait events that need it */
- for (cur_event = set->events;
- cur_event < (set->events + set->nevents);
- cur_event++)
+ for (i = 0; i < set->nevents; i++)
{
- if (cur_event->reset)
+ cur_event = &set->events[set->permutation[i]];
+
+ /*
+ * I have problem at Windows when SSPI connections "hanged" in WaitForMultipleObjects which
+ * doesn't signal presence of input data (while it is possible to read this data from the socket).
+ * Looks like "reset" logic is not completely correct (resetting event just after
+ * receiveing presious read event). Reseting all read events fixes this problem.
+ */
+ if (cur_event->events & WL_SOCKET_READABLE)
+ /* if (cur_event->reset) */
{
- WaitEventAdjustWin32(set, cur_event);
+ WaitEventAdjustWin32(set, cur_event, false);
cur_event->reset = false;
}
@@ -1402,7 +1494,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
* With an offset of one, due to the always present pgwin32_signal_event,
* the handle offset directly corresponds to a wait event.
*/
- cur_event = (WaitEvent *) &set->events[rc - WAIT_OBJECT_0 - 1];
+ cur_event = (WaitEvent *) &set->events[set->permutation[rc - WAIT_OBJECT_0 - 1]];
occurred_events->pos = cur_event->pos;
occurred_events->user_data = cur_event->user_data;
@@ -1443,7 +1535,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
else if (cur_event->events & WL_SOCKET_MASK)
{
WSANETWORKEVENTS resEvents;
- HANDLE handle = set->handles[cur_event->pos + 1];
+ HANDLE handle = set->handles[cur_event->index + 1];
Assert(cur_event->fd);
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 1b7053c..b7c1ed7 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -774,7 +774,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
/* Identify owner for lock */
if (sessionLock)
+ {
owner = NULL;
+ MyProc->is_tainted = true;
+ }
else
owner = CurrentResourceOwner;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 498373f..3e530e7 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -397,6 +397,7 @@ InitProcess(void)
MyProc->roleId = InvalidOid;
MyProc->tempNamespaceId = InvalidOid;
MyProc->isBackgroundWorker = IsBackgroundWorker;
+ MyProc->is_tainted = false;
MyPgXact->delayChkpt = false;
MyPgXact->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a6505c7..e07f540 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4237,6 +4237,8 @@ PostgresMain(int argc, char *argv[],
*/
if (ConfigReloadPending)
{
+ if (RestartPoolerOnReload && strcmp(application_name, "pool_worker") == 0)
+ proc_exit(0);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index bc62c6e..6f1bb75 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -18,6 +18,7 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/predicate_internals.h"
+#include "storage/proc.h"
#include "utils/array.h"
#include "utils/builtins.h"
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 3bf96de..6036703 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -130,9 +130,15 @@ int max_parallel_maintenance_workers = 2;
*/
int NBuffers = 1000;
int MaxConnections = 90;
+int SessionPoolSize = 0;
+int IdlePoolWorkerTimeout = 0;
+int ConnectionProxiesNumber = 0;
+int SessionSchedule = SESSION_SCHED_ROUND_ROBIN;
+
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
+int MaxSessions = 1000;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
@@ -148,3 +154,6 @@ int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
double vacuum_cleanup_index_scale_factor;
+bool RestartPoolerOnReload = false;
+bool ProxyingGUCs = false;
+bool MultitenantProxy = false;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fc46360..06cbae3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -457,6 +457,14 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry session_schedule_options[] = {
+ {"round-robin", SESSION_SCHED_ROUND_ROBIN, false},
+ {"random", SESSION_SCHED_RANDOM, false},
+ {"load-balancing", SESSION_SCHED_LOAD_BALANCING, false},
+ {NULL, 0, false}
+};
+
+
static struct config_enum_entry shared_memory_options[] = {
#ifndef WIN32
{"sysv", SHMEM_TYPE_SYSV, false},
@@ -1286,6 +1294,36 @@ static struct config_bool ConfigureNamesBool[] =
},
{
+ {"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING,
+ gettext_noop("Restart session pool workers on pg_reload_conf()."),
+ NULL,
+ },
+ &RestartPoolerOnReload,
+ false,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"proxying_gucs", PGC_USERSET, CONN_POOLING,
+ gettext_noop("Support setting parameters in connection pooler sessions."),
+ NULL,
+ },
+ &ProxyingGUCs,
+ false,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"multitenant_proxy", PGC_USERSET, CONN_POOLING,
+ gettext_noop("One pool worker can serve clients with different roles"),
+ NULL,
+ },
+ &MultitenantProxy,
+ false,
+ NULL, NULL, NULL
+ },
+
+ {
{"log_duration", PGC_SUSET, LOGGING_WHAT,
gettext_noop("Logs the duration of each completed SQL statement."),
NULL
@@ -2138,6 +2176,53 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ /* see max_connections and max_wal_senders */
+ {"session_pool_size", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 10, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"idle_pool_worker_timeout", PGC_USERSET, CONN_POOLING,
+ gettext_noop("Sets the maximum allowed duration of any idling connection pool worker."),
+ gettext_noop("A value of 0 turns off the timeout."),
+ GUC_UNIT_MS
+ },
+ &IdlePoolWorkerTimeout,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"connection_proxies", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets number of connection proxies."),
+ gettext_noop("Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing)."
+ "Each proxy launches its own subset of backends. So maximal number of non-tainted backends is "
+ "session_pool_size*connection_proxies*databases*roles.")
+ },
+ &ConnectionProxiesNumber,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"max_sessions", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by ont connection proxy."
+ "It can be greater than max_connections and actually be arbitrary large.")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
/* see max_connections */
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
@@ -2185,6 +2270,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"proxy_port", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the TCP port for the connection pooler."),
+ NULL
+ },
+ &ProxyPortNumber,
+ 6543, 1, 65535,
+ NULL, NULL, NULL
+ },
+
+ {
{"unix_socket_permissions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the access permissions of the Unix-domain socket."),
gettext_noop("Unix-domain sockets use the usual Unix file system "
@@ -4550,6 +4645,16 @@ static struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
+ {
+ {"session_schedule", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Session schedule policy for connection pool."),
+ NULL
+ },
+ &SessionSchedule,
+ SESSION_SCHED_ROUND_ROBIN, session_schedule_options,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
@@ -8146,6 +8251,9 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot set parameters during a parallel operation")));
+ if (!stmt->is_local)
+ MyProc->is_tainted = true;
+
switch (stmt->kind)
{
case VAR_SET_VALUE:
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b88e886..812c469 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10704,4 +10704,11 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+# builin connection pool
+{ oid => '3435', descr => 'information about connection pooler proxies workload',
+ proname => 'pg_pooler_state', prorows => '1000', proretset => 't',
+ provolatile => 'v', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int4,int4,int4,int4,int4,int4,int4,int4,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,n_clients,n_ssl_clients,n_pools,n_backends,n_dedicated_backends,n_idle_backends,n_idle_clients,tx_bytes,rx_bytes,n_transactions}', prosrc => 'pg_pooler_state' },
+
]
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 541f970..d739dc3 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -51,7 +51,7 @@
#include <security.h>
#undef SECURITY_WIN32
-#ifndef ENABLE_GSS
+#if !defined(ENABLE_GSS) && !defined(GSS_BUFFER_STUB_DEFINED)
/*
* Define a fake structure compatible with GSSAPI on Unix.
*/
@@ -60,6 +60,7 @@ typedef struct
void *value;
int length;
} gss_buffer_desc;
+#define GSS_BUFFER_STUB_DEFINED
#endif
#endif /* ENABLE_SSPI */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 08a2576..1e12ee1 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -54,10 +54,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
* prototypes for functions in pqcomm.c
*/
extern WaitEventSet *FeBeWaitSet;
-
-extern int StreamServerPort(int family, char *hostName,
- unsigned short portNumber, char *unixSocketDir,
- pgsocket ListenSocket[], int MaxListen);
+extern int StreamServerPort(int family, char *hostName,
+ unsigned short portNumber, char *unixSocketDir,
+ pgsocket ListenSocket[], int ListenPort[], int MaxListen);
extern int StreamConnection(pgsocket server_fd, Port *port);
extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 61a24c2..8a31f4e 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -159,6 +159,22 @@ extern PGDLLIMPORT int data_directory_mode;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+
+enum SessionSchedulePolicy
+{
+ SESSION_SCHED_ROUND_ROBIN,
+ SESSION_SCHED_RANDOM,
+ SESSION_SCHED_LOAD_BALANCING
+};
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
+extern PGDLLIMPORT int IdlePoolWorkerTimeout;
+extern PGDLLIMPORT int ConnectionProxiesNumber;
+extern PGDLLIMPORT int SessionSchedule;
+extern PGDLLIMPORT bool RestartPoolerOnReload;
+extern PGDLLIMPORT bool ProxyingGUCs;
+extern PGDLLIMPORT bool MultitenantProxy;
+
extern PGDLLIMPORT int max_worker_processes;
extern PGDLLIMPORT int max_parallel_workers;
diff --git a/src/include/port.h b/src/include/port.h
index b5c03d9..3ea24a3 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h
index f4841fb..fbc31d6 100644
--- a/src/include/port/win32_port.h
+++ b/src/include/port/win32_port.h
@@ -445,6 +445,7 @@ extern int pgkill(int pid, int sig);
#define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
#define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
+#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks)
SOCKET pgwin32_socket(int af, int type, int protocol);
int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen);
@@ -455,6 +456,7 @@ int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptf
int pgwin32_recv(SOCKET s, char *buf, int len, int flags);
int pgwin32_send(SOCKET s, const void *buf, int len, int flags);
int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout);
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]);
extern int pgwin32_noblock;
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index b692d8b..d301f8c 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -17,6 +17,7 @@
extern bool EnableSSL;
extern int ReservedBackends;
extern PGDLLIMPORT int PostPortNumber;
+extern PGDLLIMPORT int ProxyPortNumber;
extern int Unix_socket_permissions;
extern char *Unix_socket_group;
extern char *Unix_socket_directories;
@@ -46,6 +47,11 @@ extern int postmaster_alive_fds[2];
extern PGDLLIMPORT const char *progname;
+extern PGDLLIMPORT void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[], char** errmsg);
+
+struct Proxy;
+struct Port;
+
extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
extern void ClosePostmasterPorts(bool am_syslogger);
extern void InitProcessGlobals(void);
@@ -62,6 +68,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+extern int ParseStartupPacket(struct Port* port, MemoryContext memctx, void* pkg_body, int pkg_size, bool SSLdone);
+extern int BackendStartup(struct Port* port, int* backend_pid);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/postmaster/proxy.h b/src/include/postmaster/proxy.h
new file mode 100644
index 0000000..254d0f0
--- /dev/null
+++ b/src/include/postmaster/proxy.h
@@ -0,0 +1,45 @@
+/*-------------------------------------------------------------------------
+ *
+ * proxy.h
+ * Exports from postmaster/proxy.c.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/proxy.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _PROXY_H
+#define _PROXY_H
+
+/*
+ * Information in share dmemory about connection proxy state (used for session scheduling and monitoring)
+ */
+typedef struct ConnectionProxyState
+{
+ int pid; /* proxy worker pid */
+ int n_clients; /* total number of clients */
+ int n_ssl_clients; /* number of clients using SSL connection */
+ int n_pools; /* nubmer of dbname/role combinations */
+ int n_backends; /* totatal number of launched backends */
+ int n_dedicated_backends; /* number of tainted backends */
+ int n_idle_backends; /* number of idle backends */
+ int n_idle_clients; /* number of idle clients */
+ uint64 tx_bytes; /* amount of data sent to client */
+ uint64 rx_bytes; /* amount of data send to server */
+ uint64 n_transactions; /* total number of proroceeded transactions */
+} ConnectionProxyState;
+
+extern ConnectionProxyState* ProxyState;
+extern PGDLLIMPORT int MyProxyId;
+extern PGDLLIMPORT pgsocket MyProxySocket;
+
+extern int ConnectionProxyStart(void);
+extern int ConnectionProxyShmemSize(void);
+extern void ConnectionProxyShmemInit(void);
+#ifdef EXEC_BACKEND
+extern void ConnectionProxyMain(int argc, char *argv[]);
+#endif
+
+#endif
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11..1dfac95 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -133,9 +133,11 @@ typedef struct Latch
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
#endif
+#define WL_SOCKET_EDGE (1 << 7)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
+ WL_SOCKET_EDGE | \
WL_SOCKET_CONNECTED)
typedef struct WaitEvent
@@ -143,12 +145,15 @@ typedef struct WaitEvent
int pos; /* position in the event data structure */
uint32 events; /* triggered events */
pgsocket fd; /* socket fd associated with event */
+ int index; /* position of correspondent element in descriptors array (for poll() and win32 implementation */
void *user_data; /* pointer provided in AddWaitEventToSet */
#ifdef WIN32
bool reset; /* Is reset of the event required? */
#endif
} WaitEvent;
+extern bool WaitEventUseEpoll;
+
/* forward declaration to avoid exposing latch.c implementation details */
typedef struct WaitEventSet WaitEventSet;
@@ -177,6 +182,8 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index ac7ee72..e7207e2 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -203,6 +203,8 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ bool is_tainted; /* backend has modified session GUCs, use temporary tables, prepare statements, ... */
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index d68976f..9ff45b1 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -58,6 +58,7 @@ enum config_group
CONN_AUTH_SETTINGS,
CONN_AUTH_AUTH,
CONN_AUTH_SSL,
+ CONN_POOLING,
RESOURCES,
RESOURCES_MEM,
RESOURCES_DISK,
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index fcf2bc2..7f2a1df 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -59,7 +59,7 @@
#include <security.h>
#undef SECURITY_WIN32
-#ifndef ENABLE_GSS
+#if !defined(ENABLE_GSS) && !defined(GSS_BUFFER_STUB_DEFINED)
/*
* Define a fake structure compatible with GSSAPI on Unix.
*/
@@ -68,6 +68,7 @@ typedef struct
void *value;
int length;
} gss_buffer_desc;
+#define GSS_BUFFER_STUB_DEFINED
#endif
#endif /* ENABLE_SSPI */
diff --git a/src/makefiles/Makefile.cygwin b/src/makefiles/Makefile.cygwin
index f274d80..fdf53e9 100644
--- a/src/makefiles/Makefile.cygwin
+++ b/src/makefiles/Makefile.cygwin
@@ -19,6 +19,7 @@ override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL
diff --git a/src/makefiles/Makefile.win32 b/src/makefiles/Makefile.win32
index 3dea11e..39bd2de 100644
--- a/src/makefiles/Makefile.win32
+++ b/src/makefiles/Makefile.win32
@@ -17,6 +17,7 @@ CFLAGS_SL =
ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL
diff --git a/src/test/regress/GNUmakefile b/src/test/regress/GNUmakefile
index a24cfd4..38dda4d 100644
--- a/src/test/regress/GNUmakefile
+++ b/src/test/regress/GNUmakefile
@@ -130,6 +130,7 @@ REGRESS_OPTS = --dlpath=. --max-concurrent-tests=20 $(EXTRA_REGRESS_OPTS)
check: all tablespace-setup
$(pg_regress_check) $(REGRESS_OPTS) --schedule=$(srcdir)/parallel_schedule $(MAXCONNOPT) $(EXTRA_TESTS)
+ $(pg_regress_check) $(REGRESS_OPTS) --schedule=$(srcdir)/parallel_schedule $(MAXCONNOPT) $(EXTRA_TESTS) --port=6543 --temp-config=$(srcdir)/conn_proxy.conf
check-tests: all tablespace-setup | temp-install
$(pg_regress_check) $(REGRESS_OPTS) $(MAXCONNOPT) $(TESTS) $(EXTRA_TESTS)
diff --git a/src/test/regress/conn_proxy.conf b/src/test/regress/conn_proxy.conf
new file mode 100644
index 0000000..ebaa257
--- /dev/null
+++ b/src/test/regress/conn_proxy.conf
@@ -0,0 +1,3 @@
+connection_proxies = 1
+port = 5432
+log_statement=all
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index d1d0aed..a677577 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -158,6 +158,7 @@ sub mkvcbuild
$postgres = $solution->AddProject('postgres', 'exe', '', 'src/backend');
$postgres->AddIncludeDir('src/backend');
+ $postgres->AddIncludeDir('src/port');
$postgres->AddDir('src/backend/port/win32');
$postgres->AddFile('src/backend/utils/fmgrtab.c');
$postgres->ReplaceFile('src/backend/port/pg_sema.c',
@@ -271,6 +272,12 @@ sub mkvcbuild
$libpqwalreceiver->AddIncludeDir('src/interfaces/libpq');
$libpqwalreceiver->AddReference($postgres, $libpq);
+ my $libpqconn =
+ $solution->AddProject('libpqconn', 'dll', '',
+ 'src/backend/postmaster/libpqconn');
+ $libpqconn->AddIncludeDir('src/interfaces/libpq');
+ $libpqconn->AddReference($postgres, $libpq);
+
my $pgoutput = $solution->AddProject('pgoutput', 'dll', '',
'src/backend/replication/pgoutput');
$pgoutput->AddReference($postgres);
diff --git a/src/tools/msvc/clean.bat b/src/tools/msvc/clean.bat
index d034ec5..ef6eb81 100755
--- a/src/tools/msvc/clean.bat
+++ b/src/tools/msvc/clean.bat
@@ -19,6 +19,7 @@ if exist pgsql.suo del /q /a:H pgsql.suo
del /s /q src\bin\win32ver.rc 2> NUL
del /s /q src\interfaces\win32ver.rc 2> NUL
if exist src\backend\win32ver.rc del /q src\backend\win32ver.rc
+if exist src\backend\postmaster\libpqconn\win32ver.rc del /q src\backend\postmaster\libpqconn\win32ver.rc
if exist src\backend\replication\libpqwalreceiver\win32ver.rc del /q src\backend\replication\libpqwalreceiver\win32ver.rc
if exist src\backend\replication\pgoutput\win32ver.rc del /q src\backend\replication\pgoutput\win32ver.rc
if exist src\backend\snowball\win32ver.rc del /q src\backend\snowball\win32ver.rc