On 08.07.2019 3:37, Thomas Munro wrote:
On Tue, Jul 2, 2019 at 3:11 AM Konstantin Knizhnik
<k.knizh...@postgrespro.ru> wrote:
On 01.07.2019 12:57, Thomas Munro wrote:
Interesting work.  No longer applies -- please rebase.

Rebased version of the patch is attached.
Also all this version of built-ni proxy can be found in conn_proxy
branch of https://github.com/postgrespro/postgresql.builtin_pool.git
Thanks Konstantin.  I haven't looked at the code, but I can't help
noticing that this CF entry and the autoprepare one are both features
that come up again an again on feature request lists I've seen.
That's very cool.  They also both need architectural-level review.
With my Commitfest manager hat on: reviewing other stuff would help
with that; if you're looking for something of similar complexity and
also the same level of
everyone-knows-we-need-to-fix-this-!@#$-we-just-don't-know-exactly-how-yet
factor, I hope you get time to provide some more feedback on Takeshi
Ideriha's work on shared caches, which doesn't seem a million miles
from some of the things you're working on.

Thank you, I will look at Takeshi Ideriha's patch.

Could you please fix these compiler warnings so we can see this
running check-world on CI?

https://ci.appveyor.com/project/postgresql-cfbot/postgresql/build/1.0.46324
https://travis-ci.org/postgresql-cfbot/postgresql/builds/555180678

Sorry, I do not have access to Windows host, so can not check Win32 build myself. I have fixed all the reported warnings but can not verify that Win32 build is now ok.



diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 84341a30e5..9398e561e8 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -719,6 +719,123 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-sessions" xreflabel="max_sessions">
+      <term><varname>max_sessions</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_sessions</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          The maximum number of client sessions that can be handled by
+          one connection proxy when session pooling is switched on.
+          This parameter does not add any memory or CPU overhead, so
+          specifying a large <varname>max_sessions</varname> value
+          does not affect performance.
+          If the <varname>max_sessions</varname> limit is reached new connection are not accepted.
+        </para>
+        <para>
+          The default value is 1000. This parameter can only be set at server start.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-session-pool-size" xreflabel="session_pool_size">
+      <term><varname>session_pool_size</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>session_pool_size</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          Enables session pooling and defines the maximum number of
+          backends that can be used by client sessions for each database/user combination.
+          Launched non-tainted backends are never terminated even if there are no active sessions.
+          Backend is considered as tainted if client updates GUCs, creates temporary table or prepared statements.
+          Tainted backend can server only one client.
+        </para>
+        <para>
+          The default value is 10, so up to 10 backends will server each database,
+        </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-proxy-port" xreflabel="proxy_port">
+      <term><varname>proxy_port</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>proxy_port</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          Sets the TCP port for the connection pooler.
+          Clients connected to main "port" will be assigned dedicated backends,
+          while client connected to proxy port will be connected to backends through proxy which
+          performs transaction level scheduling. 
+       </para>
+        <para>
+          The default value is 6543.
+        </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-connection-proxies" xreflabel="connection_proxies">
+      <term><varname>connection_proxies</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>connection_proxies</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          Sets number of connection proxies.
+          Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing)."
+		 "Each proxy launches its own subset of backends. So maximal number of non-tainted backends is "
+		  "session_pool_size*connection_proxies*databases*roles.
+       </para>
+        <para>
+          The default value is 0, so session pooling is disabled.
+        </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-session-schedule" xreflabel="session_schedule">
+      <term><varname>session_schedule</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>session_schedule</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          Specifies scheduling policy for assigning session to proxies in case of
+          connection pooling. Default policy is <literal>round-robin</literal>.
+        </para>
+        <para>
+          With <literal>round-robin</literal> policy postmaster cyclicly scatter sessions between proxies.
+        </para>
+        <para>
+          With <literal>random</literal> policy postmaster randomly choose proxy for new session.
+        </para>
+        <para>
+          With <literal>load-balancing</literal> policy postmaster choose proxy with lowest load average.
+          Load average of proxy is estimated by number of clients connection assigned to this proxy with extra weight for SSL connections.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-restart-pooler-on-reload" xreflabel="restart_pooler_on_reload">
+      <term><varname>restart_pooler_on_reload</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>restart_pooler_on_reload</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+        <para>
+          Restart session pool workers once <function>pg_reload_conf()</function> is called.
+          The default value is <literal>false</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-unix-socket-directories" xreflabel="unix_socket_directories">
       <term><varname>unix_socket_directories</varname> (<type>string</type>)
       <indexterm>
diff --git a/doc/src/sgml/connpool.sgml b/doc/src/sgml/connpool.sgml
new file mode 100644
index 0000000000..07f4202f75
--- /dev/null
+++ b/doc/src/sgml/connpool.sgml
@@ -0,0 +1,174 @@
+<!-- doc/src/sgml/connpool.sgml -->
+
+ <chapter id="connection-pooling">
+  <title>Connection pooling</title>
+
+  <indexterm zone="connection-pooling">
+   <primary>built-in connection pool proxy</primary>
+  </indexterm>
+
+  <para>
+    <productname>PostgreSQL</productname> spawns a separate process (backend) for each client.
+    For large number of clients such model can cause consumption of large number of system
+    resources and lead to significant performance degradation, especially at computers with large
+    number of CPU cores. The reason is high contention between backends for postgres resources.
+    Also size of many Postgres internal data structures are proportional to the number of
+    active backends as well as complexity of algorithms for this data structures.
+  </para>
+
+  <para>
+    This is why most of production Postgres installation are using some kind of connection pooling:
+    pgbouncer, J2EE, odyssey,... But external connection pooler requires additional efforts for installation,
+    configuration and maintenance. Also pgbouncer (the most popular connection pooler for Postgres) is
+    single-threaded and so can be bottleneck for highload system, so multiple instances of pgbouncer have to be launched.
+  </para>
+
+  <para>
+    Starting from version 12 <productname>PostgreSQL</productname> provides built-in connection pooler.
+    This chapter describes architecture and usage of built-in connection pooler.
+  </para>
+
+ <sect1 id="how-connection-pooler-works">
+  <title>How Built-in Connection Pooler Works</title>
+
+  <para>
+    Built-in connection pooler spawns one or more proxy processes which connect clients and backends.
+    Number of proxy processes is controlled by <varname>connection_proxies</varname> configuration parameter.
+    To avoid substantial changes in Postgres locking mechanism, only transaction level pooling policy is implemented.
+    It means that pooler is able to reschedule backend to another session only when it completed the current transaction.
+  </para>
+
+  <para>
+    As far as each Postgres backend is able to work only with single database, each proxy process maintains
+    hash table of connections pools for each pair of <literal>dbname,role</literal>.
+    Maximal number of backends which can be spawned by connection pool is limited by
+    <varname>session_pool_size</varname> configuration variable.
+    So maximal number of non-dedicated backends in pooling mode is limited by
+    <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<literal>#databases</literal>*<literal>#roles</literal>.
+  </para>
+
+  <para>
+    To minimize number of changes in Postgres core, built-in connection pooler is not trying to save/restore
+    session context. If session context is modified by client application
+    (changing values of configuration variables (GUCs), creating temporary tables, preparing statements, advisory locking),
+    then backend executing this session is considered to be <emphasis>tainted</emphasis>.
+    It is now dedicated to this session and can not be rescheduled to other session.
+    Once this session is terminated, backend is terminated as well.
+    Non-tainted backends are not terminated even if there are no more connected sessions.
+  </para>
+
+  <para>
+    Built-in connection pooler is accepted connections on separate port (<varname>proxy_port</varname> configuration option, default value is 6543).
+    If client is connected to postgres through standard port (<varname>port</varname> configuration option, default value is 5432), then normal (<emphasis>dedicated</emphasis>) backend is created. It works only
+    with this client and is terminated when client is disconnected. Standard port is also used by proxy itself to
+    launch new worker backends. It means that to enable connection pooler Postgres should be configured
+    to accept local connections (<literal>pg_hba.conf</literal> file).
+  </para>
+
+  <para>
+    If client application is connected through proxy port, then its communication with backend is always
+    performed through proxy. Even if it changes session context and backend becomes <emphasis>tainted</emphasis>,
+    still all traffic between this client and backend comes through proxy.
+  </para>
+
+  <para>
+    Postmaster accepts connections on proxy port and redirects it to one of connection proxies.
+    Right now sessions and bounded to proxy and can not migrate between them.
+    To provide uniform load balancing of proxies, postmaster is using one of three scheduling policies:
+    <literal>round-robin</literal>, <literal>random</literal> and <literal>load-balancing</literal>.
+    In the last case postmaster will choose proxy with smallest number of already attached clients, with
+    extra weight added to SSL connections (which consume more CPU).
+  </para>
+
+ </sect1>
+
+ <sect1 id="connection-pooler-configuration">
+  <title>Built-in Connection Pooler Configuration</title>
+
+  <para>
+    There are four main configuration variables controlling connection pooler:
+    <varname>session_pool_size</varname>, <varname>connection_proxies</varname>, <varname>max_sessions</varname> and <varname>proxy_port</varname>.
+    Connection pooler is enabled if all of them are non-zero.
+  </para>
+
+  <para>
+    <varname>connection_proxies</varname> specifies number of connection proxy processes which will be spawned.
+    Default value is zero, so connection pooling is disabled by default.
+  </para>
+
+  <para>
+    <varname>session_pool_size</varname> specifies maximal number of backends per connection pool. Maximal number of laucnhed non-dedicated backends in pooling mode is limited by
+    <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<literal>#databases</literal>*<literal>#roles</literal>.
+    If number of backends is too small, then server will not be able to utilize all system resources.
+    But too large value can cause degradation of performance because of large snapshots and lock contention.
+  </para>
+
+  <para>
+    <varname>max_sessions</varname>parameter specifies maximal number of sessions which can be handled by one connection proxy.
+    Actually it affects only size of wait event set and so can be large enough without any  essential negative impact on system resources consumption.
+    Default value is 1000. So maximal number of connections to one database/role is limited by <varname>connection_proxies</varname>*<varname>session_pool_size</varname>*<varname>max_sessions</varname>.
+  </para>
+
+  <para>
+    Connection proxy accepts connections on special port, defined by <varname>proxy_port</varname>.
+    Default value is 6543, but it can be changed to standard Postgres 4321, so by default all connections to the databases will be pooled.
+    But it is still necessary to have a port for direct connections to the database (dedicated backends).
+    It is needed for connection pooler itself to launch worker backends.
+  </para>
+
+  <para>
+    Postmaster scatters sessions between proxies using one of three available scheduling policies:
+    <literal>round-robin</literal>, <literal>random</literal> and <literal>load-balancing</literal>.
+    Policy can be set using <varname>session_schedule</varname> configuration variable. Default policy is
+    <literal>round-robin</literal> which cause cyclic distribution of sessions between proxies.
+    It should work well in case of more or less uniform workload.
+    The smartest policy is <literal>load-balancing</literal> which tries to choose least loaded proxy
+    based on the available statistic. It is possible to monitor proxies state using <function>pg_pooler_state()</function> function, which returns information about number of clients, backends and pools for each proxy as well
+    as some statistic information about number of proceeded transactions and amount of data
+    sent from client to backends (<varname>rx_bytes</varname>) and from backends to clients (<varname>tx_bytes</varname>).
+  </para>
+
+  <para>
+    As far as pooled backends are not terminated on client exist, it will not
+    be possible to drop database to which them are connected.  It can be achieved without server restart using <varname>restart_pooler_on_reload</varname> variable. Setting this variable to <literal>true</literal> cause shutdown of all pooled backends after execution of <function>pg_reload_conf()</function> function. Then it will be possible to drop database.
+  </para>
+
+ </sect1>
+
+ <sect1 id="connection-pooler-constraints">
+  <title>Built-in Connection Pooler Pros and Cons</title>
+
+  <para>
+    Unlike pgbouncer and other external connection poolera, built-in connection pooler doesn't require installation and configuration of some other components.
+    Also it doesn't introduce any limitations for clients: existed clients can work through proxy and don't notice any difference.
+    If client application requires session context, then it will be served by dedicated backend. Such connection will not participate in
+    connection pooling but it will correctly work. This is the main difference with pgbouncer,
+    which may cause incorrect behavior of client application in case of using other session level pooling policy.
+    And if application is not changing session context, then it can be implicitly pooled, reducing number of active backends.
+  </para>
+
+  <para>
+    The main limitation of current built-in connection pooler implementation is that it is not able to save/resume session context.
+    Although it is not so difficult to do, but it requires more changes in Postgres core. So developers of client applications still have a choice
+    either to avoid using session-specific operations either not to use pooling. For example, using prepared statements can improve speed of simple queries
+    up to two times. But prepared statements can not be handled by pooled backend, so if all clients are using prepared statements, then there will be no connection pooling
+    even if connection pooling is enabled.
+  </para>
+
+  <para>
+    Redirecting connections through connection proxy definitely have negative effect on total system performance and especially latency.
+    Overhead of connection proxing depends on too many factors, such as characteristics of external and internal networks, complexity of queries and size of returned result set.
+    Pgbench benchmark in select-only mode shows almost two times worser performance for local connections through connection pooler comparing with direct local connections when
+    number of connections is small enough (10). For much larger number of connections (when pooling is actually required), pooling mode outperforms direct connection mode.
+  </para>
+
+  <para>
+    Another obvious limitation of transaction level pooling is that long living transaction can cause starvation of
+    other clients. It greatly depends on application design. If application opens database transaction and then waits for user input or some other external event, then backend can be in <emphasis>idle-in-transaction</emphasis>
+    state for long enough time. And such backend can not be rescheduled for some another session.
+    The obvious recommendation is to avoid long-living transaction and setup <varname>idle_in_transaction_session_timeout</varname> to implicitly abort such transactions.
+  </para>
+
+ </sect1>
+
+ </chapter>
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 8960f11278..5b19fef481 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -29,6 +29,7 @@
 <!ENTITY syntax     SYSTEM "syntax.sgml">
 <!ENTITY textsearch SYSTEM "textsearch.sgml">
 <!ENTITY typeconv   SYSTEM "typeconv.sgml">
+<!ENTITY connpool   SYSTEM "connpool.sgml">
 
 <!-- administrator's guide -->
 <!ENTITY backup        SYSTEM "backup.sgml">
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 3e115f1c76..029f0dc4e3 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -109,6 +109,7 @@
   &mvcc;
   &perform;
   &parallel;
+  &connpool;
 
  </part>
 
diff --git a/src/Makefile b/src/Makefile
index bcdbd9588a..196ca8c0f0 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
 	interfaces \
 	backend/replication/libpqwalreceiver \
 	backend/replication/pgoutput \
+	backend/postmaster/libpqconn \
 	fe_utils \
 	bin \
 	pl \
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index c278ee7318..acbaed313a 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -30,6 +30,7 @@
 #include "parser/parse_expr.h"
 #include "parser/parse_type.h"
 #include "rewrite/rewriteHandler.h"
+#include "storage/proc.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -457,6 +458,7 @@ StorePreparedStatement(const char *stmt_name,
 											  stmt_name,
 											  HASH_ENTER,
 											  &found);
+	MyProc->is_tainted = true;
 
 	/* Shouldn't get a duplicate entry */
 	if (found)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index fd67d2a841..10a14d0e03 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -590,6 +590,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("ON COMMIT can only be used on temporary tables")));
 
+	if (stmt->relation->relpersistence != RELPERSISTENCE_TEMP
+		&& stmt->oncommit != ONCOMMIT_DROP)
+		MyProc->is_tainted = true;
+
 	if (stmt->partspec != NULL)
 	{
 		if (relkind != RELKIND_RELATION)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 384887e70d..ebff20a61a 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -195,15 +195,13 @@ pq_init(void)
 {
 	/* initialize state variables */
 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
-	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+	if (!PqSendBuffer)
+		PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	PqCommReadingMsg = false;
 	DoingCopyOut = false;
 
-	/* set up process-exit hook to close the socket */
-	on_proc_exit(socket_close, 0);
-
 	/*
 	 * In backends (as soon as forked) we operate the underlying socket in
 	 * nonblocking mode and use latches to implement blocking semantics if
@@ -220,6 +218,11 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
+	if (FeBeWaitSet)
+		FreeWaitEventSet(FeBeWaitSet);
+	else
+		on_proc_exit(socket_close, 0);
+
 	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
 	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
 					  NULL, NULL);
@@ -227,6 +230,7 @@ pq_init(void)
 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
 }
 
+
 /* --------------------------------
  *		socket_comm_reset - reset libpq during error recovery
  *
@@ -329,7 +333,7 @@ socket_close(int code, Datum arg)
 int
 StreamServerPort(int family, char *hostName, unsigned short portNumber,
 				 char *unixSocketDir,
-				 pgsocket ListenSocket[], int MaxListen)
+				 pgsocket ListenSocket[], int ListenPort[], int MaxListen)
 {
 	pgsocket	fd;
 	int			err;
@@ -593,6 +597,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
 							familyDesc, addrDesc, (int) portNumber)));
 
 		ListenSocket[listen_index] = fd;
+		ListenPort[listen_index] = portNumber;
 		added++;
 	}
 
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index f4120bec55..e0cdd9e8bb 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = atomics.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
 
 ifeq ($(PORTNAME), win32)
 SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000000..e395868eef
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,165 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ *	  Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+#ifdef WIN32
+typedef struct
+{
+	SOCKET origsocket;
+	WSAPROTOCOL_INFO wsainfo;
+} InheritableSocket;
+#endif
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int
+pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid)
+{
+#ifdef WIN32
+	InheritableSocket dst;
+	size_t rc;
+	dst.origsocket = sock;
+	if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0)
+	{
+		ereport(FATAL,
+				(errmsg("could not duplicate socket %d for use in backend: error code %d",
+						(int)sock, WSAGetLastError())));
+		return -1;
+	}
+	rc = send(chan, (char*)&dst, sizeof(dst), 0);
+	if (rc != sizeof(dst))
+	{
+		ereport(FATAL,
+				(errmsg("Failed to send inheritable socket: rc=%d, error code %d",
+						(int)rc, WSAGetLastError())));
+		return -1;
+	}
+	return 0;
+#else
+	struct msghdr msg = { 0 };
+	struct iovec io;
+	struct cmsghdr * cmsg;
+    char buf[CMSG_SPACE(sizeof(sock))];
+    memset(buf, '\0', sizeof(buf));
+
+    /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+    io.iov_base = "";
+	io.iov_len = 1;
+
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+    msg.msg_control = buf;
+    msg.msg_controllen = sizeof(buf);
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+	if (!cmsg)
+		return PGINVALID_SOCKET;
+
+    cmsg->cmsg_level = SOL_SOCKET;
+    cmsg->cmsg_type = SCM_RIGHTS;
+    cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+    memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+    msg.msg_controllen = cmsg->cmsg_len;
+
+    while (sendmsg(chan, &msg, 0) < 0)
+	{
+		if (errno != EINTR)
+			return PGINVALID_SOCKET;
+	}
+	return 0;
+#endif
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket
+pg_recv_sock(pgsocket chan)
+{
+#ifdef WIN32
+	InheritableSocket src;
+	SOCKET s;
+	size_t rc = recv(chan, (char*)&src, sizeof(src), 0);
+	if (rc != sizeof(src))
+	{
+		ereport(FATAL,
+				(errmsg("Failed to receive inheritable socket: rc=%d, error code %d",
+						(int)rc, WSAGetLastError())));
+	}
+	s = WSASocket(FROM_PROTOCOL_INFO,
+				  FROM_PROTOCOL_INFO,
+				  FROM_PROTOCOL_INFO,
+				  &src.wsainfo,
+				  0,
+				  0);
+	if (s == INVALID_SOCKET)
+	{
+		ereport(FATAL,
+				(errmsg("could not create inherited socket: error code %d\n",
+						WSAGetLastError())));
+	}
+
+	/*
+	 * To make sure we don't get two references to the same socket, close
+	 * the original one. (This would happen when inheritance actually
+	 * works..
+	 */
+	closesocket(src.origsocket);
+	return s;
+#else
+	struct msghdr msg = {0};
+    char c_buffer[256];
+    char m_buffer[256];
+    struct iovec io;
+	struct cmsghdr * cmsg;
+	pgsocket sock;
+
+    io.iov_base = m_buffer;
+	io.iov_len = sizeof(m_buffer);
+    msg.msg_iov = &io;
+    msg.msg_iovlen = 1;
+
+    msg.msg_control = c_buffer;
+    msg.msg_controllen = sizeof(c_buffer);
+
+    while (recvmsg(chan, &msg, 0) < 0)
+	{
+		if (errno != EINTR)
+			return PGINVALID_SOCKET;
+	}
+
+    cmsg = CMSG_FIRSTHDR(&msg);
+	if (!cmsg)
+	{
+		elog(WARNING, "Failed to transfer socket");
+		return PGINVALID_SOCKET;
+	}
+
+    memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+	pg_set_noblock(sock);
+
+    return sock;
+#endif
+}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index d5b5e771e9..53eece6422 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -690,3 +690,65 @@ pgwin32_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, c
 		memcpy(writefds, &outwritefds, sizeof(fd_set));
 	return nummatches;
 }
+
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2])
+{
+    union {
+       struct sockaddr_in inaddr;
+       struct sockaddr addr;
+    } a;
+    SOCKET listener;
+    int e;
+    socklen_t addrlen = sizeof(a.inaddr);
+    DWORD flags = 0;
+    int reuse = 1;
+
+    socks[0] = socks[1] = -1;
+
+    listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    if (listener == -1)
+        return SOCKET_ERROR;
+
+    memset(&a, 0, sizeof(a));
+    a.inaddr.sin_family = AF_INET;
+    a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    a.inaddr.sin_port = 0;
+
+    for (;;) {
+        if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+               (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+            break;
+        if  (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+            break;
+
+        memset(&a, 0, sizeof(a));
+        if  (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+            break;
+        a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        a.inaddr.sin_family = AF_INET;
+
+        if (listen(listener, 1) == SOCKET_ERROR)
+            break;
+
+        socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+        if (socks[0] == -1)
+            break;
+        if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+            break;
+
+        socks[1] = accept(listener, NULL, NULL);
+        if (socks[1] == -1)
+            break;
+
+        closesocket(listener);
+        return 0;
+    }
+
+    e = WSAGetLastError();
+    closesocket(listener);
+    closesocket(socks[0]);
+    closesocket(socks[1]);
+    WSASetLastError(e);
+    socks[0] = socks[1] = -1;
+    return SOCKET_ERROR;
+}
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c23211b2..5d8b65c50a 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,9 @@ subdir = src/backend/postmaster
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
+override CPPFLAGS :=  $(CPPFLAGS) -I$(top_builddir)/src/port -I$(top_srcdir)/src/port
+
 OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
-	pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+	pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o proxy.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/libpqconn/Makefile b/src/backend/postmaster/libpqconn/Makefile
new file mode 100644
index 0000000000..f05b72758e
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/Makefile
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/postmaster/libpqconn
+#
+# IDENTIFICATION
+#    src/backend/postmaster/libpqconn/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/postmaster/libpqconn
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = libpqconn.o $(WIN32RES)
+SHLIB_LINK_INTERNAL = $(libpq)
+SHLIB_LINK = $(filter -lintl, $(LIBS))
+SHLIB_PREREQS = submake-libpq
+PGFILEDESC = "libpqconn - open libpq connection"
+NAME = libpqconn
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+	rm -f $(OBJS)
diff --git a/src/backend/postmaster/libpqconn/libpqconn.c b/src/backend/postmaster/libpqconn/libpqconn.c
new file mode 100644
index 0000000000..bdba0f6e2c
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/libpqconn.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqconn.c
+ *
+ * This file provides a way to establish connection to postgres instanc from backend.
+ *
+ * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/postmaster/libpqconn/libpqconn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "fmgr.h"
+#include "libpq-fe.h"
+#include "postmaster/postmaster.h"
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+static void*
+libpq_connectdb(char const* keywords[], char const* values[])
+{
+	PGconn* conn = PQconnectdbParams(keywords, values, false);
+	if (!conn || PQstatus(conn) != CONNECTION_OK)
+	{
+		ereport(WARNING,
+				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+				 errmsg("could not setup local connect to server"),
+				 errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
+		return NULL;
+	}
+	return conn;
+}
+
+void _PG_init(void)
+{
+	LibpqConnectdbParams = libpq_connectdb;
+}
+
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 688ad439ed..73a695b5ee 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -114,6 +114,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
 #include "postmaster/syslogger.h"
 #include "replication/logicallauncher.h"
 #include "replication/walsender.h"
@@ -196,6 +197,9 @@ BackgroundWorker *MyBgworkerEntry = NULL;
 /* The socket number we are listening for connections on */
 int			PostPortNumber;
 
+/* The socket number we are listening for poolled connections on */
+int			ProxyPortNumber;
+
 /* The directory names for Unix socket(s) */
 char	   *Unix_socket_directories;
 
@@ -216,6 +220,7 @@ int			ReservedBackends;
 /* The socket(s) we're listening to. */
 #define MAXLISTEN	64
 static pgsocket ListenSocket[MAXLISTEN];
+static int      ListenPort[MAXLISTEN];
 
 /*
  * Set by the -o option
@@ -246,6 +251,18 @@ bool		enable_bonjour = false;
 char	   *bonjour_name;
 bool		restart_after_crash = true;
 
+typedef struct ConnectionProxy
+{
+	int pid;
+	pgsocket socks[2];
+} ConnectionProxy;
+
+ConnectionProxy* ConnectionProxies;
+static bool ConnectionProxiesStarted;
+static int CurrentConnectionProxy; /* index used for round-robin distribution of connections between proxies */
+
+void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]);
+
 /* PIDs of special child processes; 0 when not running */
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
@@ -403,7 +420,6 @@ static void BackendInitialize(Port *port);
 static void BackendRun(Port *port) pg_attribute_noreturn();
 static void ExitPostmaster(int status) pg_attribute_noreturn();
 static int	ServerLoop(void);
-static int	BackendStartup(Port *port);
 static int	ProcessStartupPacket(Port *port, bool secure_done);
 static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
 static void processCancelRequest(Port *port, void *pkt);
@@ -425,6 +441,7 @@ static pid_t StartChildProcess(AuxProcType type);
 static void StartAutovacuumWorker(void);
 static void MaybeStartWalReceiver(void);
 static void InitPostmasterDeathWatchHandle(void);
+static void StartProxyWorker(int id);
 
 /*
  * Archiver is allowed to start up at the current postmaster state?
@@ -477,6 +494,8 @@ typedef struct
 {
 	Port		port;
 	InheritableSocket portsocket;
+	InheritableSocket proxySocket;
+	int         proxyId;
 	char		DataDir[MAXPGPATH];
 	pgsocket	ListenSocket[MAXLISTEN];
 	int32		MyCancelKey;
@@ -560,6 +579,48 @@ int			postmaster_alive_fds[2] = {-1, -1};
 HANDLE		PostmasterHandle;
 #endif
 
+static void
+StartConnectionProxies(void)
+{
+	if (SessionPoolSize > 0 && ConnectionProxiesNumber > 0 && !ConnectionProxiesStarted)
+	{
+		int i;
+		if (ConnectionProxies == NULL)
+		{
+			ConnectionProxies = malloc(sizeof(ConnectionProxy)*ConnectionProxiesNumber);
+			for (i = 0; i < ConnectionProxiesNumber; i++)
+			{
+				if (socketpair(AF_UNIX, SOCK_STREAM, 0, ConnectionProxies[i].socks) < 0)
+					ereport(FATAL,
+							(errcode_for_file_access(),
+							 errmsg_internal("could not create socket pair for launching sessions: %m")));
+			}
+		}
+		for (i = 0; i < ConnectionProxiesNumber; i++)
+		{
+			StartProxyWorker(i);
+		}
+		ConnectionProxiesStarted = true;
+	}
+}
+
+/*
+ * Send signal to connection proxies
+ */
+static void
+StopConnectionProxies(int signal)
+{
+	if (ConnectionProxiesStarted)
+	{
+		int i;
+		for (i = 0; i < ConnectionProxiesNumber; i++)
+		{
+			signal_child(ConnectionProxies[i].pid, signal);
+		}
+		ConnectionProxiesStarted = false;
+	}
+}
+
 /*
  * Postmaster main entry point
  */
@@ -572,6 +633,9 @@ PostmasterMain(int argc, char *argv[])
 	bool		listen_addr_saved = false;
 	int			i;
 	char	   *output_config_variable = NULL;
+	bool        contains_localhost = false;
+	int         ports[2];
+	int         n_ports = 0;
 
 	InitProcessGlobals();
 
@@ -1008,6 +1072,11 @@ PostmasterMain(int argc, char *argv[])
 
 	on_proc_exit(CloseServerPorts, 0);
 
+	/* Listen on proxy socket only if session pooling is enabled */
+	if (ProxyPortNumber > 0 && ConnectionProxiesNumber > 0 && SessionPoolSize > 0)
+		ports[n_ports++] = ProxyPortNumber;
+	ports[n_ports++] = PostPortNumber;
+
 	if (ListenAddresses)
 	{
 		char	   *rawstring;
@@ -1031,32 +1100,36 @@ PostmasterMain(int argc, char *argv[])
 		foreach(l, elemlist)
 		{
 			char	   *curhost = (char *) lfirst(l);
-
-			if (strcmp(curhost, "*") == 0)
-				status = StreamServerPort(AF_UNSPEC, NULL,
-										  (unsigned short) PostPortNumber,
-										  NULL,
-										  ListenSocket, MAXLISTEN);
-			else
-				status = StreamServerPort(AF_UNSPEC, curhost,
-										  (unsigned short) PostPortNumber,
-										  NULL,
-										  ListenSocket, MAXLISTEN);
-
-			if (status == STATUS_OK)
+			for (i = 0; i < n_ports; i++)
 			{
-				success++;
-				/* record the first successful host addr in lockfile */
-				if (!listen_addr_saved)
+				int port = ports[i];
+				if (strcmp(curhost, "*") == 0)
+					status = StreamServerPort(AF_UNSPEC, NULL,
+											  (unsigned short) port,
+											  NULL,
+											  ListenSocket, ListenPort, MAXLISTEN);
+				else
+					status = StreamServerPort(AF_UNSPEC, curhost,
+											  (unsigned short) port,
+											  NULL,
+											  ListenSocket, ListenPort, MAXLISTEN);
+				contains_localhost |= strcmp(curhost, "localhost") == 0;
+
+				if (status == STATUS_OK)
 				{
-					AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
-					listen_addr_saved = true;
+					success++;
+					/* record the first successful host addr in lockfile */
+					if (!listen_addr_saved)
+					{
+						AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
+						listen_addr_saved = true;
+					}
 				}
+				else
+					ereport(WARNING,
+							(errmsg("could not create listen socket for \"%s\"",
+									curhost)));
 			}
-			else
-				ereport(WARNING,
-						(errmsg("could not create listen socket for \"%s\"",
-								curhost)));
 		}
 
 		if (!success && elemlist != NIL)
@@ -1125,29 +1198,32 @@ PostmasterMain(int argc, char *argv[])
 					 errmsg("invalid list syntax in parameter \"%s\"",
 							"unix_socket_directories")));
 		}
-
+		contains_localhost = true;
 		foreach(l, elemlist)
 		{
 			char	   *socketdir = (char *) lfirst(l);
+			for (i = 0; i < n_ports; i++)
+			{
+				int port = ports[i];
 
-			status = StreamServerPort(AF_UNIX, NULL,
-									  (unsigned short) PostPortNumber,
-									  socketdir,
-									  ListenSocket, MAXLISTEN);
+				status = StreamServerPort(AF_UNIX, NULL,
+										  (unsigned short) port,
+										  socketdir,
+										  ListenSocket, ListenPort, MAXLISTEN);
 
-			if (status == STATUS_OK)
-			{
-				success++;
-				/* record the first successful Unix socket in lockfile */
-				if (success == 1)
-					AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+				if (status == STATUS_OK)
+				{
+					success++;
+					/* record the first successful Unix socket in lockfile */
+					if (success == 1)
+						AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+				}
+				else
+					ereport(WARNING,
+							(errmsg("could not create Unix-domain socket in directory \"%s\"",
+									socketdir)));
 			}
-			else
-				ereport(WARNING,
-						(errmsg("could not create Unix-domain socket in directory \"%s\"",
-								socketdir)));
 		}
-
 		if (!success && elemlist != NIL)
 			ereport(FATAL,
 					(errmsg("could not create any Unix-domain sockets")));
@@ -1157,6 +1233,20 @@ PostmasterMain(int argc, char *argv[])
 	}
 #endif
 
+	if (!contains_localhost && ProxyPortNumber > 0)
+	{
+		/* we need to accept local connections from proxy */
+		status = StreamServerPort(AF_UNSPEC, "localhost",
+								  (unsigned short) PostPortNumber,
+								  NULL,
+								  ListenSocket, ListenPort, MAXLISTEN);
+		if (status != STATUS_OK)
+		{
+			ereport(WARNING,
+					(errmsg("could not create listen socket for locahost")));
+		}
+	}
+
 	/*
 	 * check that we have some socket to listen on
 	 */
@@ -1374,6 +1464,8 @@ PostmasterMain(int argc, char *argv[])
 	/* Some workers may be scheduled to start now */
 	maybe_start_bgworkers();
 
+	StartConnectionProxies();
+
 	status = ServerLoop();
 
 	/*
@@ -1611,6 +1703,57 @@ DetermineSleepTime(struct timeval *timeout)
 	}
 }
 
+/**
+ * This function tries to estimate workload of proxy.
+ * We have a lot of information about proxy state in ProxyState array:
+ * total number of clients, SSL clients, backends, traffic, number of transactions,...
+ * So in principle it is possible to implement much more sophisticated evaluation function,
+ * but right now we take in account only number of clients and SSL connections (which requires much more CPU)
+ */
+static uint64
+GetConnectionProxyWorkload(int id)
+{
+	return ProxyState[id].n_clients + ProxyState[id].n_ssl_clients*3;
+}
+
+/**
+ * Choose connection pool for this session.
+ * Right now sessions can not be moved between pools (in principle it is not so difficult to implement it),
+ * so to support order balancing we should do dome smart work here.
+ */
+static ConnectionProxy*
+SelectConnectionProxy(void)
+{
+	int i;
+	uint64 min_workload;
+	int least_loaded_proxy;
+
+	switch (SessionSchedule)
+	{
+	  case SESSION_SCHED_ROUND_ROBIN:
+		return &ConnectionProxies[CurrentConnectionProxy++ % ConnectionProxiesNumber];
+	  case SESSION_SCHED_RANDOM:
+		return &ConnectionProxies[random() % ConnectionProxiesNumber];
+	  case SESSION_SCHED_LOAD_BALANCING:
+		min_workload = GetConnectionProxyWorkload(0);
+		least_loaded_proxy = 0;
+		for (i = 1; i < ConnectionProxiesNumber; i++)
+		{
+			int workload = GetConnectionProxyWorkload(i);
+			if (workload < min_workload)
+			{
+				min_workload = workload;
+				least_loaded_proxy = i;
+			}
+		}
+		return &ConnectionProxies[least_loaded_proxy];
+	  default:
+		Assert(false);
+	}
+	return NULL;
+}
+
+
 /*
  * Main idle loop of postmaster
  *
@@ -1701,8 +1844,18 @@ ServerLoop(void)
 					port = ConnCreate(ListenSocket[i]);
 					if (port)
 					{
-						BackendStartup(port);
-
+						if (ConnectionProxies && ListenPort[i] == ProxyPortNumber)
+						{
+							ConnectionProxy* proxy = SelectConnectionProxy();
+							if (pg_send_sock(proxy->socks[0], port->sock, proxy->pid) < 0)
+							{
+								elog(LOG, "could not send socket to connection pool: %m");
+							}
+						}
+						else
+						{
+							BackendStartup(port, NULL);
+						}
 						/*
 						 * We no longer need the open socket or port structure
 						 * in this process
@@ -1899,8 +2052,6 @@ ProcessStartupPacket(Port *port, bool secure_done)
 {
 	int32		len;
 	void	   *buf;
-	ProtocolVersion proto;
-	MemoryContext oldcontext;
 
 	pq_startmsgread();
 
@@ -1967,6 +2118,18 @@ ProcessStartupPacket(Port *port, bool secure_done)
 	}
 	pq_endmsgread();
 
+	return ParseStartupPacket(port, TopMemoryContext, buf, len, secure_done);
+}
+
+int
+ParseStartupPacket(Port *port, MemoryContext memctx, void* buf, int len, bool secure_done)
+{
+	ProtocolVersion proto;
+	MemoryContext oldcontext;
+
+	am_walsender = false;
+	am_db_walsender = false;
+
 	/*
 	 * The first field is either a protocol version number or a special
 	 * request code.
@@ -2067,7 +2230,7 @@ retry1:
 	 * not worry about leaking this storage on failure, since we aren't in the
 	 * postmaster process anymore.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	oldcontext = MemoryContextSwitchTo(memctx);
 
 	if (PG_PROTOCOL_MAJOR(proto) >= 3)
 	{
@@ -2739,6 +2902,8 @@ pmdie(SIGNAL_ARGS)
 				if (WalWriterPID != 0)
 					signal_child(WalWriterPID, SIGTERM);
 
+				StopConnectionProxies(SIGTERM);
+
 				/*
 				 * If we're in recovery, we can't kill the startup process
 				 * right away, because at present doing so does not release
@@ -2816,6 +2981,9 @@ pmdie(SIGNAL_ARGS)
 				/* and the walwriter too */
 				if (WalWriterPID != 0)
 					signal_child(WalWriterPID, SIGTERM);
+
+				StopConnectionProxies(SIGTERM);
+
 				pmState = PM_WAIT_BACKENDS;
 			}
 
@@ -4041,6 +4209,7 @@ TerminateChildren(int signal)
 		signal_child(PgArchPID, signal);
 	if (PgStatPID != 0)
 		signal_child(PgStatPID, signal);
+	StopConnectionProxies(signal);
 }
 
 /*
@@ -4050,8 +4219,8 @@ TerminateChildren(int signal)
  *
  * Note: if you change this code, also consider StartAutovacuumWorker.
  */
-static int
-BackendStartup(Port *port)
+int
+BackendStartup(Port *port, int* backend_pid)
 {
 	Backend    *bn;				/* for backend cleanup */
 	pid_t		pid;
@@ -4155,6 +4324,8 @@ BackendStartup(Port *port)
 	if (!bn->dead_end)
 		ShmemBackendArrayAdd(bn);
 #endif
+	if (backend_pid)
+		*backend_pid = pid;
 
 	return STATUS_OK;
 }
@@ -4851,6 +5022,7 @@ SubPostmasterMain(int argc, char *argv[])
 	if (strcmp(argv[1], "--forkbackend") == 0 ||
 		strcmp(argv[1], "--forkavlauncher") == 0 ||
 		strcmp(argv[1], "--forkavworker") == 0 ||
+		strcmp(argv[1], "--forkproxy") == 0 ||
 		strcmp(argv[1], "--forkboot") == 0 ||
 		strncmp(argv[1], "--forkbgworker=", 15) == 0)
 		PGSharedMemoryReAttach();
@@ -4991,6 +5163,19 @@ SubPostmasterMain(int argc, char *argv[])
 
 		AutoVacWorkerMain(argc - 2, argv + 2);	/* does not return */
 	}
+	if (strcmp(argv[1], "--forkproxy") == 0)
+	{
+		/* Restore basic shared memory pointers */
+		InitShmemAccess(UsedShmemSegAddr);
+
+		/* Need a PGPROC to run CreateSharedMemoryAndSemaphores */
+		InitProcess();
+
+		/* Attach process to shared data structures */
+		CreateSharedMemoryAndSemaphores(0);
+
+		ConnectionProxyMain(argc - 2, argv + 2);	/* does not return */
+	}
 	if (strncmp(argv[1], "--forkbgworker=", 15) == 0)
 	{
 		int			shmem_slot;
@@ -5059,7 +5244,6 @@ ExitPostmaster(int status)
 				 errmsg_internal("postmaster became multithreaded"),
 				 errdetail("Please report this to <pgsql-b...@lists.postgresql.org>.")));
 #endif
-
 	/* should cleanup shared memory and kill all backends */
 
 	/*
@@ -5525,6 +5709,74 @@ StartAutovacuumWorker(void)
 	}
 }
 
+/*
+ * StartProxyWorker
+ *		Start an proxy worker process.
+ *
+ * This function is here because it enters the resulting PID into the
+ * postmaster's private backends list.
+ *
+ * NB -- this code very roughly matches BackendStartup.
+ */
+static void
+StartProxyWorker(int id)
+{
+	Backend    *bn;
+	int         pid;
+
+	/*
+	 * Compute the cancel key that will be assigned to this session. We
+	 * probably don't need cancel keys for autovac workers, but we'd
+	 * better have something random in the field to prevent unfriendly
+	 * people from sending cancels to them.
+	 */
+	if (!RandomCancelKey(&MyCancelKey))
+	{
+		ereport(LOG,
+				(errcode(ERRCODE_INTERNAL_ERROR),
+				 errmsg("could not generate random cancel key")));
+		return   ;
+	}
+	bn = (Backend *) malloc(sizeof(Backend));
+	if (bn)
+	{
+		bn->cancel_key = MyCancelKey;
+
+		/* Autovac workers are not dead_end and need a child slot */
+		bn->dead_end = false;
+		bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
+		bn->bgworker_notify = false;
+
+		MyProxyId = id;
+		MyProxySocket = ConnectionProxies[id].socks[1];
+		pid = ConnectionProxyStart();
+		if (pid > 0)
+		{
+			bn->pid = pid;
+			bn->bkend_type = BACKEND_TYPE_BGWORKER;
+			dlist_push_head(&BackendList, &bn->elem);
+#ifdef EXEC_BACKEND
+			ShmemBackendArrayAdd(bn);
+#endif
+			/* all OK */
+			ConnectionProxies[id].pid = pid;
+			ProxyState[id].pid = pid;
+			return;
+		}
+
+		/*
+		 * fork failed, fall through to report -- actual error message was
+		 * logged by ConnectionProxyStart
+		 */
+		(void) ReleasePostmasterChildSlot(bn->child_slot);
+		free(bn);
+	}
+	else
+		ereport(LOG,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+}
+
 /*
  * MaybeStartWalReceiver
  *		Start the WAL receiver process, if not running and our state allows.
@@ -6116,6 +6368,10 @@ save_backend_variables(BackendParameters *param, Port *port,
 
 	strlcpy(param->ExtraOptions, ExtraOptions, MAXPGPATH);
 
+	if (!write_inheritable_socket(&param->proxySocket, MyProxySocket, childPid))
+		return false;
+	param->proxyId = MyProxyId;
+
 	return true;
 }
 
@@ -6347,6 +6603,9 @@ restore_backend_variables(BackendParameters *param, Port *port)
 	strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH);
 
 	strlcpy(ExtraOptions, param->ExtraOptions, MAXPGPATH);
+
+	read_inheritable_socket(&MyProxySocket, &param->proxySocket);
+	MyProxyId = param->proxyId;
 }
 
 
diff --git a/src/backend/postmaster/proxy.c b/src/backend/postmaster/proxy.c
new file mode 100644
index 0000000000..ab058fa5f9
--- /dev/null
+++ b/src/backend/postmaster/proxy.c
@@ -0,0 +1,1024 @@
+#include <unistd.h>
+#include <errno.h>
+
+#include "postgres.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
+#include "postmaster/fork_process.h"
+#include "access/htup_details.h"
+#include "replication/walsender.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "libpq/libpq.h"
+#include "libpq/libpq-be.h"
+#include "libpq/pqsignal.h"
+#include "tcop/tcopprot.h"
+#include "utils/timeout.h"
+#include "utils/ps_status.h"
+#include "../interfaces/libpq/libpq-fe.h"
+#include "../interfaces/libpq/libpq-int.h"
+
+#define INIT_BUF_SIZE      (64*1024)
+#define MAX_READY_EVENTS   128
+#define DB_HASH_SIZE       101
+#define PROXY_WAIT_TIMEOUT 1000 /* 1 second */
+
+struct SessionPool;
+struct Proxy;
+
+typedef struct
+{
+	char database[NAMEDATALEN];
+	char username[NAMEDATALEN];
+}
+SessionPoolKey;
+
+/*
+ * Channels represent both clients and backends
+ */
+typedef struct Channel
+{
+	char*    buf;
+	int      rx_pos;
+	int      tx_pos;
+	int      tx_size;
+	int      buf_size;
+	int      event_pos;          /* Position of wait event returned by AddWaitEventToSet */
+
+	Port*    client_port;        /* Not null for client, null for server */
+
+	pgsocket backend_socket;
+	PGPROC*  backend_proc;
+	int      backend_pid;
+	bool     backend_is_tainted; /* client changes session context */
+	bool     backend_is_ready;   /* ready for query */
+	bool     is_interrupted;     /* client interrupts query execution */
+	bool     is_disconnected;    /* connection is lost */
+
+	/* We need to save startup packet response to be able to send it to new connection */
+	int      handshake_response_size;
+	char*    handshake_response;
+
+	struct Channel* peer;
+	struct Channel* next;
+	struct Proxy*   proxy;
+	struct SessionPool* pool;
+}
+Channel;
+
+/*
+ * Control structure for connection proxies (several proxy workers can be launched and each has it sown proxy instance).
+ * Proxy contains hash of session pools for reach role/dbname combination.
+ */
+typedef struct Proxy
+{
+	MemoryContext memctx;        /* Memory context for this proxy (used only in single thread) */
+	MemoryContext tmpctx;        /* Temporary memory context used for parsing startup packet */
+	WaitEventSet* wait_events;   /* Set of socket descriptors of backends and clients socket descriptors */
+	HTAB*    pools;              /* Session pool map with dbname/role used as a key */
+	int      n_accepted_connections; /* Number of accepted, but not yet established connections
+									  * (startup packet is not received and db/role are not known) */
+	int      max_backends;       /* Maximal number of backends per database */
+	bool     shutdown;           /* Shutdown flag */
+	Channel* hangout;            /* List of disconnected backends */
+	ConnectionProxyState* state; /* State of proxy */
+} Proxy;
+
+/*
+ * Connection pool to particular role/dbname
+ */
+typedef struct SessionPool
+{
+	SessionPoolKey key;
+	Channel* idle_backends;       /* List of idle clients */
+	Channel* pending_clients;     /* List of clients waiting for free backend */
+	Proxy*   proxy;               /* Owner of this pool */
+	int      n_launched_backends; /* Total number of launched backends */
+	int      n_idle_backends;     /* Number of backends in idle state */
+	int      n_connected_clients; /* Total number of connected clients */
+	int      n_idle_clients;      /* Number of clients in idle state */
+	int      n_pending_clients;   /* Number of clients waiting for free backend */
+}
+SessionPool;
+
+static void channel_remove(Channel* chan);
+static Channel* backend_start(SessionPool* pool, Port* client_port);
+static bool channel_read(Channel* chan);
+static bool channel_write(Channel* chan, bool synchronous);
+
+/*
+ * #define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__)
+ */
+#define ELOG(severity,fmt,...)
+//#define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__)
+
+static Proxy* proxy;
+int MyProxyId;
+pgsocket MyProxySocket;
+ConnectionProxyState* ProxyState;
+
+/**
+ * Backend is ready for next command outside transaction block (idle state).
+ * Now if backend is not tainted it is possible to schedule some other client to this backend.
+ */
+static bool
+backend_reschedule(Channel* chan)
+{
+	chan->backend_is_ready = false;
+	if (chan->backend_proc == NULL) /* Lazy resolving of PGPROC entry */
+	{
+		Assert(chan->backend_pid != 0);
+		chan->backend_proc = BackendPidGetProc(chan->backend_pid);
+		Assert(chan->backend_proc); /* If backend completes execution of some query, then it has definitely registered itself in procarray */
+	}
+	if (!chan->backend_proc->is_tainted) /* If backend is not storing some session context */
+	{
+		Channel* pending = chan->pool->pending_clients;
+		Assert(!chan->backend_is_tainted);
+		if (chan->peer)
+			chan->peer->peer = NULL;
+		chan->pool->n_idle_clients += 1;
+		if (pending)
+		{
+            /* Has pending clients: serve one of them */
+			ELOG(LOG, "Backed %d is reassigned to client %p", chan->backend_pid, pending);
+			chan->pool->pending_clients = pending->next;
+			Assert(chan != pending);
+			chan->peer = pending;
+			pending->peer = chan;
+			chan->pool->n_pending_clients -= 1;
+			if (pending->tx_size == 0) /* new client has sent startup packet and we now need to send handshake response */
+			{
+				Assert(chan->handshake_response != NULL); /* backend already sent handshake response */
+				Assert(chan->handshake_response_size < chan->buf_size);
+				memcpy(chan->buf, chan->handshake_response, chan->handshake_response_size);
+				chan->rx_pos = chan->tx_size = chan->handshake_response_size;
+				ELOG(LOG, "Simulate response for startup packet to client %p", pending);
+				chan->backend_is_ready = true;
+				return channel_write(pending, false);
+			}
+			else
+			{
+				ELOG(LOG, "Try to send pending request from client %p to backend %p (pid %d)", pending, chan, chan->backend_pid);
+				Assert(pending->tx_pos == 0 && pending->rx_pos >= pending->tx_size);
+				return channel_write(chan, false); /* Send pending request to backend */
+			}
+		}
+		else /* return backend to the list of idle backends */
+		{
+			ELOG(LOG, "Backed %d is idle", chan->backend_pid);
+			Assert(!chan->client_port);
+			chan->next = chan->pool->idle_backends;
+			chan->pool->idle_backends = chan;
+			chan->pool->n_idle_backends += 1;
+			chan->peer = NULL;
+		}
+	}
+	else if (!chan->backend_is_tainted) /* if it was not marked as tainted before... */
+	{
+		chan->backend_is_tainted = true;
+		chan->proxy->state->n_dedicated_backends += 1;
+	}
+	return true;
+}
+
+/**
+ * Parse client's startup packet and assign client to proper connection pool based on dbname/role
+ */
+static bool
+client_connect(Channel* chan, int startup_packet_size)
+{
+	bool found;
+	SessionPoolKey key;
+	char* startup_packet = chan->buf;
+
+	Assert(chan->client_port);
+
+	/* parse startup packet in tmpctx memory context and reset it when it is not needed any more */
+	MemoryContextReset(chan->proxy->tmpctx);
+	MemoryContextSwitchTo(chan->proxy->tmpctx);
+
+	/* Associate libpq with client's port */
+	MyProcPort = chan->client_port;
+	pq_init();
+
+	if (ParseStartupPacket(chan->client_port, chan->proxy->tmpctx, startup_packet+4, startup_packet_size-4, false) != STATUS_OK) /* skip packet size */
+	{
+		MyProcPort = NULL;
+		elog(WARNING, "Failed to parse startup packet for client %p", chan);
+		return false;
+	}
+	MyProcPort = NULL;
+	if (am_walsender)
+	{
+		elog(WARNING, "WAL sender should not be connected through proxy");
+		return false;
+	}
+
+	chan->proxy->state->n_ssl_clients += chan->client_port->ssl_in_use;
+	pg_set_noblock(chan->client_port->sock); /* SSL handshake may switch socket to blocking mode */
+	memset(&key, 0, sizeof(key));
+	strlcpy(key.database, chan->client_port->database_name, NAMEDATALEN);
+	strlcpy(key.username, chan->client_port->user_name, NAMEDATALEN);
+
+	ELOG(LOG, "Client %p connects to %s/%s", chan, key.database, key.username);
+
+	chan->pool = (SessionPool*)hash_search(chan->proxy->pools, &key, HASH_ENTER, &found);
+	if (!found)
+	{
+		/* First connection to this role/dbname */
+		chan->proxy->state->n_pools += 1;
+		memset((char*)chan->pool + sizeof(SessionPoolKey), 0, sizeof(SessionPool) - sizeof(SessionPoolKey));
+	}
+	chan->pool->proxy = chan->proxy;
+	chan->pool->n_connected_clients += 1;
+	chan->pool->n_idle_clients += 1;
+	chan->proxy->n_accepted_connections -= 1;
+	return true;
+}
+
+/*
+ * Attach client to backend. Return true if new backend is attached, false otherwise.
+ */
+static bool
+client_attach(Channel* chan)
+{
+	Channel* idle_backend = chan->pool->idle_backends;
+	chan->pool->n_idle_clients -= 1;
+	if (idle_backend)
+	{
+		/* has some idle backend */
+		Assert(!idle_backend->backend_is_tainted && !idle_backend->client_port);
+		Assert(chan != idle_backend);
+		chan->peer = idle_backend;
+		idle_backend->peer = chan;
+		chan->pool->idle_backends = idle_backend->next;
+		chan->pool->n_idle_backends -= 1;
+		ELOG(LOG, "Attach client %p to backend %p (pid %d)", chan, idle_backend, idle_backend->backend_pid);
+	}
+	else /* all backends are busy */
+	{
+		if (chan->pool->n_launched_backends < chan->proxy->max_backends)
+		{
+			/* Try to start new backend */
+			idle_backend = backend_start(chan->pool, chan->client_port);
+			if (idle_backend != NULL)
+			{
+				ELOG(LOG, "Start new backend %p (pid %d) for client %p",
+					 idle_backend, idle_backend->backend_pid, chan);
+				Assert(chan != idle_backend);
+				chan->peer = idle_backend;
+				idle_backend->peer = chan;
+				return true;
+			}
+		}
+		/* Postpone handshake until some backend is available */
+		ELOG(LOG, "Client %p is waiting for available backends", chan);
+		chan->next = chan->pool->pending_clients;
+		chan->pool->pending_clients = chan;
+		chan->pool->n_pending_clients += 1;
+	}
+	return false;
+}
+
+/*
+ * Handle communication failure for this channel.
+ * It is not possible to remove channel immediately because it can be triggered by other epoll events.
+ * So link all channels in L1 list for pending delete.
+ */
+static void
+channel_hangout(Channel* chan, char const* op)
+{
+	Channel** ipp;
+	Channel* peer = chan->peer;
+	if (chan->is_disconnected)
+	   return;
+
+	if (chan->client_port) {
+		ELOG(LOG, "Hangout client %p due to %s error: %m", chan, op);
+		for (ipp = &chan->pool->pending_clients; *ipp != NULL; ipp = &(*ipp)->next) {
+			if (*ipp == chan) {
+				*ipp = chan->next;
+				chan->pool->n_pending_clients -= 1;
+				break;
+			}
+		}
+	} else {
+		ELOG(LOG, "Hangout backend %p (pid %d) due to %s error: %m", chan, chan->backend_pid, op);
+		for (ipp = &chan->pool->idle_backends; *ipp != NULL; ipp = &(*ipp)->next) {
+			if (*ipp == chan) {
+				*ipp = chan->next;
+				chan->pool->n_idle_backends -= 1;
+				break;
+			}
+		}
+	}
+	if (peer)
+	{
+		peer->peer = NULL;
+		chan->peer = NULL;
+	}
+	chan->backend_is_ready = false;
+
+	if (chan->client_port && peer) /* If it is client connected to backend. */
+	{
+		if (!chan->is_interrupted) /* Client didn't sent 'X' command, so do it for him. */
+		{
+			ELOG(LOG, "Send terminate command to backend %p (pid %d)", peer, peer->backend_pid);
+			peer->is_interrupted = true; /* interrupted flags makes channel_write to send 'X' message */
+			channel_write(peer, false);
+			return;
+		}
+		else if (!peer->is_interrupted)
+		{
+			/* Client already sent 'X' command, so we can safely reschedule backend to some other client session */
+			backend_reschedule(peer);
+		}
+	}
+	chan->next = chan->proxy->hangout;
+	chan->proxy->hangout = chan;
+	chan->is_disconnected = true;
+}
+
+/*
+ * Try to write data to the socket.
+ */
+static ssize_t
+socket_write(Channel* chan, char const* buf, size_t size)
+{
+	ssize_t rc;
+#ifdef USE_SSL
+	int waitfor = 0;
+	if (chan->client_port && chan->client_port->ssl_in_use)
+		rc = be_tls_write(chan->client_port, (char*)buf, size, &waitfor);
+	else
+#endif
+		rc = chan->client_port
+			? secure_raw_write(chan->client_port, buf, size)
+			: send(chan->backend_socket, buf, size, 0);
+	if (rc == 0 || (rc < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)))
+	{
+		channel_hangout(chan, "write");
+	}
+	return rc;
+}
+
+
+/*
+ * Try to send some data to the channel.
+ * Data is located in the peer buffer. Because of using edge-triggered mode we have have to use non-blocking IO
+ * and try to write all available data. Once write is completed we should try to read more data from source socket.
+ * "synchronous" flag is used to avoid infinite recursion or reads-writers.
+ * Returns true if there is nothing to do or operation is successfully completed, false in case of error
+ * or socket buffer is full.
+ */
+static bool
+channel_write(Channel* chan, bool synchronous)
+{
+	Channel* peer = chan->peer;
+	if (!chan->client_port && chan->is_interrupted)
+	{
+		/* Send terminate command to the backend. */
+		char const terminate[] = {'X', 0, 0, 0, 4};
+		if (socket_write(chan, terminate, sizeof(terminate)) <= 0)
+			return false;
+		channel_hangout(chan, "terminate");
+		return true;
+	}
+	if (peer == NULL)
+		return false;
+
+	while (peer->tx_pos < peer->tx_size) /* has something to write */
+	{
+		ssize_t rc = socket_write(chan, peer->buf + peer->tx_pos, peer->tx_size - peer->tx_pos);
+		ELOG(LOG, "%p: write %d tx_pos=%d, tx_size=%d: %m", chan, (int)rc, peer->tx_pos, peer->tx_size);
+		if (rc <= 0)
+			return false;
+		if (chan->client_port)
+			chan->proxy->state->tx_bytes += rc;
+		else
+			chan->proxy->state->rx_bytes += rc;
+		peer->tx_pos += rc;
+	}
+	if (peer->tx_size != 0)
+	{
+		/* Copy rest of received data to the beginning of the buffer */
+		chan->backend_is_ready = false;
+		Assert(peer->rx_pos >= peer->tx_size);
+		memmove(peer->buf, peer->buf + peer->tx_size, peer->rx_pos - peer->tx_size);
+		peer->rx_pos -= peer->tx_size;
+		peer->tx_pos = peer->tx_size = 0;
+		if (peer->backend_is_ready) {
+			Assert(peer->rx_pos == 0);
+			backend_reschedule(peer);
+			return true;
+		}
+	}
+	return synchronous || channel_read(peer); /* write is not invoked from read */
+}
+
+/*
+ * Try to read more data from the channel and send it to the peer.
+ */
+static bool
+channel_read(Channel* chan)
+{
+	int  msg_start;
+	while (chan->tx_size == 0) /* there is no pending write op */
+	{
+		ssize_t rc;
+#ifdef USE_SSL
+		int waitfor = 0;
+		if (chan->client_port && chan->client_port->ssl_in_use)
+			rc = be_tls_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, &waitfor);
+		else
+#endif
+			rc = chan->client_port
+				? secure_raw_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos)
+				: recv(chan->backend_socket, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, 0);
+
+		ELOG(LOG, "%p: read %d: %m", chan, (int)rc);
+		if (rc <= 0)
+		{
+			if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
+				channel_hangout(chan, "read");
+			return false; /* wait for more data */
+		}
+		chan->rx_pos += rc;
+		msg_start = 0;
+
+		/* Loop through all received messages */
+		while (chan->rx_pos - msg_start >= 5) /* has message code + length */
+		{
+			int msg_len;
+			bool handshake = false;
+			if (chan->pool == NULL) /* process startup packet */
+			{
+				Assert(msg_start == 0);
+				memcpy(&msg_len, chan->buf + msg_start, sizeof(msg_len));
+				msg_len = ntohl(msg_len);
+				handshake = true;
+			}
+			else
+			{
+				ELOG(LOG, "%p receive message %c", chan, chan->buf[msg_start]);
+				memcpy(&msg_len, chan->buf + msg_start + 1, sizeof(msg_len));
+				msg_len = ntohl(msg_len) + 1;
+			}
+			if (msg_start + msg_len > chan->buf_size)
+			{
+				/* Reallocate buffer to fit complete message body */
+				chan->buf_size = msg_start + msg_len;
+				chan->buf = realloc(chan->buf, chan->buf_size);
+			}
+			if (chan->rx_pos - msg_start >= msg_len) /* Message is completely fetched */
+			{
+				int response_size = msg_start + msg_len;
+				if (chan->pool == NULL) /* receive startup packet */
+				{
+					Assert(chan->client_port);
+					if (!client_connect(chan, msg_len))
+					{
+						/* Some trouble with processing startup packet */
+						chan->is_disconnected = true;
+						channel_remove(chan);
+						return false;
+					}
+				}
+				else if (!chan->client_port /* Message from backend */
+					&& chan->buf[msg_start] == 'Z'  /* Ready for query */
+					&& chan->buf[msg_start+5] == 'I') /* Transaction block status is idle */
+				{
+					Assert(chan->rx_pos - msg_start == msg_len); /* Should be last message */
+					chan->backend_is_ready = true; /* Backend is ready for query */
+					chan->proxy->state->n_transactions += 1;
+				}
+				else if (chan->client_port /* Message from client */
+						 && chan->buf[msg_start] == 'X')	/* Terminate message */
+				{
+					chan->is_interrupted = true;
+					if (chan->peer == NULL || !chan->peer->backend_is_tainted)
+					{
+						/* Skip terminate message to idle and non-tainted backends */
+						channel_hangout(chan, "terminate");
+						return false;
+					}
+				}
+				if (chan->peer == NULL)  /* client is not yet connected to backend */
+				{
+					if (!chan->client_port)
+					{
+						/* We are not expecting messages from idle backend. Assume that it some error or shutdown. */
+						channel_hangout(chan, "idle");
+						return false;
+					}
+					client_attach(chan);
+					if (handshake) /* Send handshake response to the client */
+					{
+                        /* If we attach new client to the existed backend, then we need to send handshake response to the client */
+						Channel* backend = chan->peer;
+						Assert(chan->rx_pos == msg_len && msg_start == 0);
+						chan->rx_pos = 0; /* Skip startup packet */
+						if (backend != NULL) /* Backend was assigned */
+						{
+							Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */
+							Assert(backend->handshake_response_size < backend->buf_size);
+							memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size);
+							backend->rx_pos = backend->tx_size = backend->handshake_response_size;
+							backend->backend_is_ready = true;
+							return channel_write(chan, false);
+						}
+						else
+						{
+							/* Handshake response will be send to client later when backend is assigned */
+							return false;
+						}
+					}
+					else if (chan->peer == NULL) /* Backend was not assigned */
+					{
+						chan->tx_size = response_size; /* query will be send later once backend is assigned */
+						return false;
+					}
+				}
+				msg_start += msg_len;
+			}
+			else break; /* Incomplete message. */
+		}
+		if (msg_start != 0)
+		{
+			/* Has some complete messages to send to peer */
+			Assert(chan->tx_pos == 0);
+			Assert(chan->rx_pos >= msg_start);
+			chan->tx_size = msg_start;
+			if (!channel_write(chan->peer, true))
+				return false;
+		}
+		/* If backend is out of transaction, then reschedule it */
+		if (chan->backend_is_ready)
+			return backend_reschedule(chan);
+	}
+	return true;
+}
+
+/*
+ * Create new channel.
+ */
+static Channel*
+channel_create(Proxy* proxy)
+{
+	Channel* chan = (Channel*)calloc(1, sizeof(Channel));
+	chan->proxy = proxy;
+	chan->buf = malloc(INIT_BUF_SIZE);
+	chan->buf_size = INIT_BUF_SIZE;
+	chan->tx_pos = chan->rx_pos = chan->tx_size = 0;
+	return chan;
+}
+
+/*
+ * Register new channel in wait event set.
+ */
+static bool
+channel_register(Proxy* proxy, Channel* chan)
+{
+	pgsocket sock = chan->client_port ? chan->client_port->sock : chan->backend_socket;
+	/* Using edge epoll mode requires non-blocking sockets */
+	pg_set_noblock(sock);
+	chan->event_pos =
+		AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE,
+						  sock, NULL, chan);
+	if (chan->event_pos < 0)
+	{
+		elog(WARNING, "PROXY: Failed to add new client - too much sessions: %d clients, %d backends. "
+					 "Try to increase 'max_sessions' configuration parameter.",
+					 proxy->state->n_clients, proxy->state->n_backends);
+		return false;
+	}
+	return true;
+}
+
+/*
+ * Start new backend for particular pool associated with dbname/role combination.
+ * Backend is forked using BackendStartup function.
+ */
+static Channel*
+backend_start(SessionPool* pool, Port* client_port)
+{
+	Channel* chan;
+	char postmaster_port[8];
+	char const* keywords[] = {"port","dbname","user","sslmode","application_name",NULL};
+	char const* values[] = {postmaster_port,pool->key.database,pool->key.username,"disable","pool_worker",NULL};
+	PGconn* conn;
+	char* msg;
+	int int32_buf;
+	int msg_len;
+	static bool libpqconn_loaded;
+
+	if (!libpqconn_loaded)
+	{
+		/* We need libpq library to be able to establish connections to pool workers.
+		* This library can not be linked statically, so load it on demand. */
+		load_file("libpqconn", false);
+		libpqconn_loaded = true;
+	}
+	pg_itoa(PostPortNumber, postmaster_port);
+	conn = LibpqConnectdbParams(keywords, values);
+	if (!conn)
+		return NULL;
+
+	chan = channel_create(pool->proxy);
+	chan->pool = pool;
+	chan->backend_socket = conn->sock;
+	/* Using edge epoll mode requires non-blocking sockets */
+	pg_set_noblock(conn->sock);
+
+	/* Save handshake response */
+	chan->handshake_response_size = conn->inEnd;
+	chan->handshake_response = malloc(chan->handshake_response_size);
+	memcpy(chan->handshake_response, conn->inBuffer, chan->handshake_response_size);
+
+	/* Extract backend pid */
+	msg = chan->handshake_response;
+	while (*msg != 'K') /* Scan handshake response until we reach PID message */
+	{
+		memcpy(&int32_buf, ++msg, sizeof(int32_buf));
+		msg_len = ntohl(int32_buf);
+		msg += msg_len;
+		Assert(msg < chan->handshake_response + chan->handshake_response_size);
+	}
+	memcpy(&int32_buf, msg+5, sizeof(int32_buf));
+	chan->backend_pid = ntohl(int32_buf);
+
+	if (channel_register(pool->proxy, chan))
+	{
+		pool->proxy->state->n_backends += 1;
+		pool->n_launched_backends += 1;
+	}
+	else
+	{
+		/* Too much sessions, error report was already logged */
+		close(chan->backend_socket);
+		free(chan->buf);
+		free(chan);
+		chan = NULL;
+	}
+	return chan;
+}
+
+/*
+ * Add new client accepted by postmaster. This client will be assigned to concrete session pool
+ * when it's startup packet is received.
+ */
+static void
+proxy_add_client(Proxy* proxy, Port* port)
+{
+	Channel* chan = channel_create(proxy);
+	chan->client_port = port;
+	chan->backend_socket = PGINVALID_SOCKET;
+	if (channel_register(proxy, chan))
+	{
+		ELOG(LOG, "Add new client %p", chan);
+		proxy->n_accepted_connections += 1;
+		proxy->state->n_clients += 1;
+	}
+	else
+	{
+		/* Too much sessions, error report was already logged */
+		close(port->sock);
+		free(port);
+		free(chan->buf);
+		free(chan);
+	}
+}
+
+/*
+ * Perform delayed deletion of channel
+ */
+static void
+channel_remove(Channel* chan)
+{
+	Assert(chan->is_disconnected); /* should be marked as disconnected by channel_hangout */
+	DeleteWaitEventFromSet(chan->proxy->wait_events, chan->event_pos);
+	if (chan->client_port)
+	{
+		if (chan->pool)
+			chan->pool->n_connected_clients -= 1;
+		else
+			chan->proxy->n_accepted_connections -= 1;
+		chan->proxy->state->n_clients -= 1;
+		chan->proxy->state->n_ssl_clients -= chan->client_port->ssl_in_use;
+		close(chan->client_port->sock);
+		free(chan->client_port);
+	}
+	else
+	{
+		chan->proxy->state->n_backends -= 1;
+		chan->proxy->state->n_dedicated_backends -= chan->backend_is_tainted;
+		chan->pool->n_launched_backends -= 1;
+		close(chan->backend_socket);
+		free(chan->handshake_response);
+	}
+	free(chan->buf);
+	free(chan);
+}
+
+
+
+/*
+ * Create new proxy.
+ */
+static Proxy*
+proxy_create(pgsocket postmaster_socket, ConnectionProxyState* state, int max_backends)
+{
+	HASHCTL ctl;
+	Proxy*  proxy = calloc(1, sizeof(Proxy));
+	proxy->memctx = AllocSetContextCreate(TopMemoryContext,
+										  "Proxy",
+										  ALLOCSET_DEFAULT_SIZES);
+	proxy->tmpctx = AllocSetContextCreate(proxy->memctx,
+										  "Startup packet parsing context",
+										  ALLOCSET_DEFAULT_SIZES);
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(SessionPoolKey);
+	ctl.entrysize = sizeof(SessionPool);
+	ctl.hcxt = proxy->memctx;
+	proxy->pools = hash_create("Pool by database and user", DB_HASH_SIZE,
+							   &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	 /* We need events both for clients and backends so multiply MaxConnection by two */
+	proxy->wait_events = CreateWaitEventSet(TopMemoryContext, MaxSessions*2);
+	AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE,
+					  postmaster_socket, NULL, NULL);
+	proxy->max_backends = max_backends;
+	proxy->state = state;
+	return proxy;
+}
+
+/*
+ * Main proxy loop
+ */
+static void
+proxy_loop(Proxy* proxy)
+{
+	int i, n_ready;
+	WaitEvent ready[MAX_READY_EVENTS];
+	Channel *chan, *next;
+
+	/* Main loop */
+	while (!proxy->shutdown)
+	{
+		/* Use timeout to allow normal proxy shutdown */
+		n_ready = WaitEventSetWait(proxy->wait_events, PROXY_WAIT_TIMEOUT, ready, MAX_READY_EVENTS, PG_WAIT_CLIENT);
+		for (i = 0; i < n_ready; i++) {
+			chan = (Channel*)ready[i].user_data;
+			if (chan == NULL) /* new connection from postmaster */
+			{
+				Port* port = (Port*)calloc(1, sizeof(Port));
+				port->sock = pg_recv_sock(ready[i].fd);
+			    if (port->sock == PGINVALID_SOCKET)
+				{
+					elog(WARNING, "Failed to receive session socket: %m");
+					free(port);
+				}
+				else
+					proxy_add_client(proxy, port);
+			}
+			else
+			{
+				if (ready[i].events & WL_SOCKET_WRITEABLE) {
+					ELOG(LOG, "Channel %p is writable", chan);
+					channel_write(chan, false);
+				}
+				if (ready[i].events & WL_SOCKET_READABLE) {
+					ELOG(LOG, "Channel %p is readable", chan);
+					channel_read(chan);
+				}
+			}
+		}
+		/*
+		 * Delayed deallocation of disconnected channels.
+		 * We can not delete channels immediately because of presence of peer events.
+		 */
+		for (chan = proxy->hangout; chan != NULL; chan = next)
+		{
+			next = chan->next;
+			channel_remove(chan);
+		}
+		proxy->hangout = NULL;
+	}
+}
+
+/*
+ * Handle normal shutdown of Postgres instance
+ */
+static void
+proxy_handle_sigterm(SIGNAL_ARGS)
+{
+	if (proxy)
+		proxy->shutdown = true;
+}
+
+#ifdef EXEC_BACKEND
+static pid_t
+proxy_forkexec(void)
+{
+	char	   *av[10];
+	int			ac = 0;
+
+	av[ac++] = "postgres";
+	av[ac++] = "--forkproxy";
+	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
+	av[ac] = NULL;
+
+	Assert(ac < lengthof(av));
+
+	return postmaster_forkexec(ac, av);
+}
+#endif
+
+NON_EXEC_STATIC void
+ConnectionProxyMain(int argc, char *argv[])
+{
+	sigjmp_buf	local_sigjmp_buf;
+
+	/* Identify myself via ps */
+	init_ps_display("connection proxy", "", "", "");
+
+	SetProcessingMode(InitProcessing);
+
+	pqsignal(SIGTERM, proxy_handle_sigterm);
+	pqsignal(SIGQUIT, quickdie);
+	InitializeTimeouts();		/* establishes SIGALRM handler */
+
+	/* Early initialization */
+	BaseInit();
+
+	/*
+	 * Create a per-backend PGPROC struct in shared memory, except in the
+	 * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+	 * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+	 * had to do some stuff with LWLocks).
+	 */
+#ifndef EXEC_BACKEND
+	InitProcess();
+#endif
+
+	/*
+	 * If an exception is encountered, processing resumes here.
+	 *
+	 * See notes in postgres.c about the design of this coding.
+	 */
+	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+	{
+		/* Prevents interrupts while cleaning up */
+		HOLD_INTERRUPTS();
+
+		/* Report the error to the server log */
+		EmitErrorReport();
+
+		/*
+		 * We can now go away.  Note that because we called InitProcess, a
+		 * callback was registered to do ProcKill, which will clean up
+		 * necessary state.
+		 */
+		proc_exit(0);
+	}
+	/* We can now handle ereport(ERROR) */
+	PG_exception_stack = &local_sigjmp_buf;
+
+	PG_SETMASK(&UnBlockSig);
+
+	proxy = proxy_create(MyProxySocket, &ProxyState[MyProxyId], SessionPoolSize);
+	proxy_loop(proxy);
+
+	proc_exit(0);
+}
+
+/*
+ * Function for launching proxy by postmaster.
+ * This "boilerplate" code is taken from another auxiliary workers.
+ * In future it may be replaced with background worker.
+ * The main problem with background worker is how to pass socket to it and obtains its PID.
+ */
+int
+ConnectionProxyStart()
+{
+	pid_t		worker_pid;
+
+#ifdef EXEC_BACKEND
+	switch ((worker_pid = proxy_forkexec()))
+#else
+	switch ((worker_pid = fork_process()))
+#endif
+	{
+		case -1:
+			ereport(LOG,
+					(errmsg("could not fork proxy worker process: %m")));
+			return 0;
+
+#ifndef EXEC_BACKEND
+		case 0:
+			/* in postmaster child ... */
+			InitPostmasterChild();
+
+			ConnectionProxyMain(0, NULL);
+			break;
+#endif
+		default:
+		  elog(LOG, "Start proxy process %d", (int) worker_pid);
+		  return (int) worker_pid;
+	}
+
+	/* shouldn't get here */
+	return 0;
+}
+
+/*
+ * We need some place in shared memory to provide information about proxies state.
+ */
+int ConnectionProxyShmemSize(void)
+{
+	return ConnectionProxiesNumber*sizeof(ConnectionProxyState);
+}
+
+void ConnectionProxyShmemInit(void)
+{
+	bool found;
+	ProxyState = (ConnectionProxyState*)ShmemInitStruct("connection proxy contexts",
+														ConnectionProxyShmemSize(), &found);
+	if (!found)
+		memset(ProxyState, 0, ConnectionProxyShmemSize());
+}
+
+PG_FUNCTION_INFO_V1(pg_pooler_state);
+
+typedef struct
+{
+	int proxy_id;
+	TupleDesc ret_desc;
+} PoolerStateContext;
+
+/**
+ * Return information about proxies state.
+ * This set-returning functions returns the following columns:
+ *
+ * pid            - proxy process identifier
+ * n_clients      - number of clients connected to proxy
+ * n_ssl_clients  - number of clients using SSL protocol
+ * n_pools        - number of pools (role/dbname combinations) maintained by proxy
+ * n_backends     - total number of backends spawned by this proxy (including tainted)
+ * n_dedicated_backends - number of tainted backend
+ * tx_bytes       - amount of data sent from backends to clients
+ * rx_bytes       - amount of data sent from client to backends
+ * n_transactions - number of transaction proceeded by all backends of this proxy
+ */
+Datum pg_pooler_state(PG_FUNCTION_ARGS)
+{
+    FuncCallContext* srf_ctx;
+	MemoryContext old_context;
+	PoolerStateContext* ps_ctx;
+	HeapTuple tuple;
+	Datum values[9];
+	bool  nulls[9];
+	int id;
+	int i;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		srf_ctx = SRF_FIRSTCALL_INIT();
+		old_context = MemoryContextSwitchTo(srf_ctx->multi_call_memory_ctx);
+        ps_ctx = (PoolerStateContext*)palloc(sizeof(PoolerStateContext));
+        get_call_result_type(fcinfo, NULL, &ps_ctx->ret_desc);
+		ps_ctx->proxy_id = 0;
+		srf_ctx->user_fctx = ps_ctx;
+		MemoryContextSwitchTo(old_context);
+	}
+	srf_ctx = SRF_PERCALL_SETUP();
+	ps_ctx = srf_ctx->user_fctx;
+	id = ps_ctx->proxy_id;
+	if (id == ConnectionProxiesNumber)
+		SRF_RETURN_DONE(srf_ctx);
+
+	values[0] = Int32GetDatum(ProxyState[id].pid);
+	values[1] = Int32GetDatum(ProxyState[id].n_clients);
+	values[2] = Int32GetDatum(ProxyState[id].n_ssl_clients);
+	values[3] = Int32GetDatum(ProxyState[id].n_pools);
+	values[4] = Int32GetDatum(ProxyState[id].n_backends);
+	values[5] = Int32GetDatum(ProxyState[id].n_dedicated_backends);
+	values[6] = Int64GetDatum(ProxyState[id].tx_bytes);
+	values[7] = Int64GetDatum(ProxyState[id].rx_bytes);
+	values[8] = Int64GetDatum(ProxyState[id].n_transactions);
+
+	for (i = 0; i <= 8; i++)
+		nulls[i] = false;
+
+	ps_ctx->proxy_id += 1;
+	tuple = heap_form_tuple(ps_ctx->ret_desc, values, nulls);
+	SRF_RETURN_NEXT(srf_ctx, HeapTupleGetDatum(tuple));
+}
+
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index d7d733530f..6d32d8fe8d 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -28,6 +28,7 @@
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
 #include "replication/logicallauncher.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(int port)
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
+		size = add_size(size, ConnectionProxyShmemSize());
 
 		/* freeze the addin request size and include it */
 		addin_request_allowed = false;
@@ -255,6 +257,7 @@ CreateSharedMemoryAndSemaphores(int port)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	ConnectionProxyShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbcf8e..b5f66519d0 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -77,6 +77,7 @@ struct WaitEventSet
 {
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
+	int         free_events;    /* L1-list of free events linked by "pos" and terminated by -1*/
 
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
@@ -137,9 +138,9 @@ static void drainSelfPipe(void);
 #if defined(WAIT_USE_EPOLL)
 static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
 #elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
 #elif defined(WAIT_USE_WIN32)
-static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
 #endif
 
 static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -585,6 +586,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	set->latch = NULL;
 	set->nevents_space = nevents;
 	set->exit_on_postmaster_death = false;
+	set->free_events = -1;
 
 #if defined(WAIT_USE_EPOLL)
 #ifdef EPOLL_CLOEXEC
@@ -691,9 +693,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 				  void *user_data)
 {
 	WaitEvent  *event;
+	int free_event;
 
 	/* not enough space */
-	Assert(set->nevents < set->nevents_space);
+	if (set->nevents == set->nevents_space)
+		return -1;
 
 	if (events == WL_EXIT_ON_PM_DEATH)
 	{
@@ -720,8 +724,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
 		elog(ERROR, "cannot wait on socket event without a socket");
 
-	event = &set->events[set->nevents];
-	event->pos = set->nevents++;
+	free_event = set->free_events;
+	if (free_event >= 0)
+	{
+		event = &set->events[free_event];
+		set->free_events = event->pos;
+		event->pos = free_event;
+	}
+	else
+	{
+		event = &set->events[set->nevents];
+		event->pos = set->nevents;
+	}
+	set->nevents += 1;
 	event->fd = fd;
 	event->events = events;
 	event->user_data = user_data;
@@ -748,14 +763,29 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
-	WaitEventAdjustWin32(set, event);
+	WaitEventAdjustWin32(set, event, false);
 #endif
 
 	return event->pos;
 }
 
+/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos)
+{
+	WaitEvent  *event = &set->events[event_pos];
+#if defined(WAIT_USE_EPOLL)
+	WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+	WaitEventAdjustPoll(set, event, true);
+#elif defined(WAIT_USE_WIN32)
+	WaitEventAdjustWin32(set, event, true);
+#endif
+}
+
 /*
  * Change the event mask and, in the WL_LATCH_SET case, the latch associated
  * with the WaitEvent.
@@ -767,7 +797,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 {
 	WaitEvent  *event;
 
-	Assert(pos < set->nevents);
+	Assert(pos < set->nevents_space);
 
 	event = &set->events[pos];
 
@@ -804,9 +834,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
 #elif defined(WAIT_USE_POLL)
-	WaitEventAdjustPoll(set, event);
+	WaitEventAdjustPoll(set, event, false);
 #elif defined(WAIT_USE_WIN32)
-	WaitEventAdjustWin32(set, event);
+	WaitEventAdjustWin32(set, event, false);
 #endif
 }
 
@@ -844,6 +874,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 			epoll_ev.events |= EPOLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			epoll_ev.events |= EPOLLOUT;
+		if (event->events & WL_SOCKET_EDGE)
+			epoll_ev.events |= EPOLLET;
 	}
 
 	/*
@@ -852,21 +884,39 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	 * requiring that, and actually it makes the code simpler...
 	 */
 	rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
-
 	if (rc < 0)
 		ereport(ERROR,
 				(errcode_for_socket_access(),
-		/* translator: %s is a syscall name, such as "poll()" */
+				 /* translator: %s is a syscall name, such as "poll()" */
 				 errmsg("%s failed: %m",
 						"epoll_ctl()")));
+
+	if (action == EPOLL_CTL_DEL)
+	{
+		int pos = event->pos;
+		event->fd = PGINVALID_SOCKET;
+		set->nevents -= 1;
+		event->pos = set->free_events;
+		set->free_events = pos;
+	}
 }
 #endif
 
 #if defined(WAIT_USE_POLL)
 static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
 {
-	struct pollfd *pollfd = &set->pollfds[event->pos];
+	int pos = event->pos;
+	struct pollfd *pollfd = &set->pollfds[pos];
+
+	if (remove)
+	{
+		set->nevents -= 1;
+		*pollfd = set->pollfds[set->nevents];
+		set->events[pos] = set->events[set->nevents];
+		event->pos = pos;
+		return;
+	}
 
 	pollfd->revents = 0;
 	pollfd->fd = event->fd;
@@ -897,9 +947,25 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 
 #if defined(WAIT_USE_WIN32)
 static void
-WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
 {
-	HANDLE	   *handle = &set->handles[event->pos + 1];
+	int pos = event->pos;
+	HANDLE	   *handle = &set->handles[pos + 1];
+
+	if (remove)
+	{
+		Assert(event->fd != PGINVALID_SOCKET);
+
+		if (*handle != WSA_INVALID_EVENT)
+			WSACloseEvent(*handle);
+
+		set->nevents -= 1;
+		set->events[pos] = set->events[set->nevents];
+		*handle = set->handles[set->nevents + 1];
+		set->handles[set->nevents + 1] = WSA_INVALID_EVENT;
+		event->pos = pos;
+		return;
+	}
 
 	if (event->events == WL_LATCH_SET)
 	{
@@ -912,7 +978,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
-		int			flags = FD_CLOSE;	/* always check for errors/EOF */
+		int flags = FD_CLOSE;	/* always check for errors/EOF */
 
 		if (event->events & WL_SOCKET_READABLE)
 			flags |= FD_READ;
@@ -929,8 +995,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 					 WSAGetLastError());
 		}
 		if (WSAEventSelect(event->fd, *handle, flags) != 0)
-			elog(ERROR, "failed to set up event for socket: error code %u",
-				 WSAGetLastError());
+			elog(ERROR, "failed to set up event for socket %p: error code %u",
+				 event->fd, WSAGetLastError());
 
 		Assert(event->fd != PGINVALID_SOCKET);
 	}
@@ -1336,7 +1402,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	{
 		if (cur_event->reset)
 		{
-			WaitEventAdjustWin32(set, cur_event);
+			WaitEventAdjustWin32(set, cur_event, false);
 			cur_event->reset = false;
 		}
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 44a59e1d4f..62ec2afd2e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4217,6 +4217,8 @@ PostgresMain(int argc, char *argv[],
 		 */
 		if (ConfigReloadPending)
 		{
+			if (RestartPoolerOnReload && strcmp(application_name, "pool_worker") == 0)
+				proc_exit(0);
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
 		}
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index ffd1970f58..16ca58d9d0 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -18,6 +18,7 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "storage/predicate_internals.h"
+#include "storage/proc.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 
@@ -658,6 +659,7 @@ pg_isolation_test_session_is_blocked(PG_FUNCTION_ARGS)
 static void
 PreventAdvisoryLocksInParallelMode(void)
 {
+	MyProc->is_tainted = true;
 	if (IsInParallelMode())
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 3bf96de256..79001ccf91 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -130,9 +130,14 @@ int			max_parallel_maintenance_workers = 2;
  */
 int			NBuffers = 1000;
 int			MaxConnections = 90;
+int			SessionPoolSize = 0;
+int			ConnectionProxiesNumber = 1;
+int			SessionSchedule = SESSION_SCHED_ROUND_ROBIN;
+
 int			max_worker_processes = 8;
 int			max_parallel_workers = 8;
 int			MaxBackends = 0;
+int			MaxSessions = 1000;
 
 int			VacuumCostPageHit = 1;	/* GUC parameters for vacuum */
 int			VacuumCostPageMiss = 10;
@@ -148,3 +153,4 @@ int			VacuumCostBalance = 0;	/* working state for vacuum */
 bool		VacuumCostActive = false;
 
 double		vacuum_cleanup_index_scale_factor;
+bool        RestartPoolerOnReload = false;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 92c4fee8f8..65f66db8e9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -457,6 +457,14 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
 	{NULL, 0, false}
 };
 
+static const struct config_enum_entry session_schedule_options[] = {
+	{"round-robin", SESSION_SCHED_ROUND_ROBIN, false},
+	{"random", SESSION_SCHED_RANDOM, false},
+	{"load-balancing", SESSION_SCHED_LOAD_BALANCING, false},
+	{NULL, 0, false}
+};
+
+
 static struct config_enum_entry shared_memory_options[] = {
 #ifndef WIN32
 	{"sysv", SHMEM_TYPE_SYSV, false},
@@ -1285,6 +1293,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING,
+		 gettext_noop("Restart session pool workers on pg_reload_conf()."),
+		 NULL,
+		},
+		&RestartPoolerOnReload,
+		false,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"log_duration", PGC_SUSET, LOGGING_WHAT,
 			gettext_noop("Logs the duration of each completed SQL statement."),
@@ -2137,6 +2155,42 @@ static struct config_int ConfigureNamesInt[] =
 		check_maxconnections, NULL, NULL
 	},
 
+	{
+		/* see max_connections and max_wal_senders */
+		{"session_pool_size", PGC_POSTMASTER, CONN_POOLING,
+			gettext_noop("Sets number of backends serving client sessions."),
+			gettext_noop("If non-zero then session pooling will be used: "
+						 "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+						 "Launched backend are never terminated even in case of no active sessions.")
+		},
+		&SessionPoolSize,
+		10, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"connection_proxies", PGC_POSTMASTER, CONN_POOLING,
+			gettext_noop("Sets number of connection proxies."),
+			gettext_noop("Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing)."
+						 "Each proxy launches its own subset of backends. So maximal number of non-tainted backends is "
+						 "session_pool_size*connection_proxies*databases*roles.")
+		},
+		&ConnectionProxiesNumber,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+ 	{
+		{"max_sessions", PGC_POSTMASTER, CONN_POOLING,
+			gettext_noop("Sets the maximum number of client session."),
+			gettext_noop("Maximal number of client sessions which can be handled by ont connection proxy."
+						 "It can be greater than max_connections and actually be arbitrary large.")
+		},
+		&MaxSessions,
+		1000, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		/* see max_connections */
 		{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
@@ -2184,6 +2238,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"proxy_port", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+			gettext_noop("Sets the TCP port for the connection pooler."),
+			NULL
+		},
+		&ProxyPortNumber,
+		6543, 1, 65535,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"unix_socket_permissions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the access permissions of the Unix-domain socket."),
@@ -4550,6 +4614,16 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"session_schedule", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Session schedule policy for connection pool."),
+			NULL
+		},
+		&SessionSchedule,
+		SESSION_SCHED_ROUND_ROBIN, session_schedule_options,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
@@ -8145,6 +8219,7 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
 				 errmsg("cannot set parameters during a parallel operation")));
+	MyProc->is_tainted = true;
 
 	switch (stmt->kind)
 	{
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index b07be12236..dac74a272d 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -506,7 +506,7 @@ MemoryContextStatsDetail(MemoryContext context, int max_children)
  * *totals (if given).
  */
 static void
-MemoryContextStatsInternal(MemoryContext context, int level,
+ MemoryContextStatsInternal(MemoryContext context, int level,
 						   bool print, int max_children,
 						   MemoryContextCounters *totals)
 {
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 87335248a0..5f528c1d72 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10677,4 +10677,11 @@
   proname => 'pg_partition_root', prorettype => 'regclass',
   proargtypes => 'regclass', prosrc => 'pg_partition_root' },
 
+# builin connection pool
+{ oid => '3435', descr => 'information about connection pooler proxies workload',
+  proname => 'pg_pooler_state', prorows => '1000', proretset => 't',
+  provolatile => 'v', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int4,int4,int4,int4,int4,int4,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,n_clients,n_ssl_clients,n_pools,n_backends,n_dedicated_backends,tx_bytes,rx_bytes,n_transactions}', prosrc => 'pg_pooler_state' },
+
 ]
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 08a257616d..1e12ee1884 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -54,10 +54,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
  * prototypes for functions in pqcomm.c
  */
 extern WaitEventSet *FeBeWaitSet;
-
-extern int	StreamServerPort(int family, char *hostName,
-							 unsigned short portNumber, char *unixSocketDir,
-							 pgsocket ListenSocket[], int MaxListen);
+extern int StreamServerPort(int family, char *hostName,
+							unsigned short portNumber, char *unixSocketDir,
+							pgsocket ListenSocket[], int ListenPort[], int MaxListen);
 extern int	StreamConnection(pgsocket server_fd, Port *port);
 extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 61a24c2e3c..86c0ef84e5 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -159,6 +159,19 @@ extern PGDLLIMPORT int data_directory_mode;
 extern PGDLLIMPORT int NBuffers;
 extern PGDLLIMPORT int MaxBackends;
 extern PGDLLIMPORT int MaxConnections;
+
+enum SessionSchedulePolicy
+{
+	SESSION_SCHED_ROUND_ROBIN,
+	SESSION_SCHED_RANDOM,
+	SESSION_SCHED_LOAD_BALANCING
+};
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
+extern PGDLLIMPORT int ConnectionProxiesNumber;
+extern PGDLLIMPORT int SessionSchedule;
+extern PGDLLIMPORT bool RestartPoolerOnReload;
+
 extern PGDLLIMPORT int max_worker_processes;
 extern PGDLLIMPORT int max_parallel_workers;
 
diff --git a/src/include/port.h b/src/include/port.h
index b5c03d912b..3ea24a3b70 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
 extern bool pg_set_noblock(pgsocket sock);
 extern bool pg_set_block(pgsocket sock);
 
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
 /* Portable path handling for Unix/Win32 (in path.c) */
 
 extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h
index f4841fb397..e101df179f 100644
--- a/src/include/port/win32_port.h
+++ b/src/include/port/win32_port.h
@@ -445,6 +445,7 @@ extern int	pgkill(int pid, int sig);
 #define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
 #define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
 #define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
+#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks)
 
 SOCKET		pgwin32_socket(int af, int type, int protocol);
 int			pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen);
@@ -455,7 +456,8 @@ int			pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptf
 int			pgwin32_recv(SOCKET s, char *buf, int len, int flags);
 int			pgwin32_send(SOCKET s, const void *buf, int len, int flags);
 int			pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout);
-
+int         pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]);
+ 
 extern int	pgwin32_noblock;
 
 #endif							/* FRONTEND */
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 8ccd2afce5..05906e94a0 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -17,6 +17,7 @@
 extern bool EnableSSL;
 extern int	ReservedBackends;
 extern PGDLLIMPORT int PostPortNumber;
+extern PGDLLIMPORT int ProxyPortNumber;
 extern int	Unix_socket_permissions;
 extern char *Unix_socket_group;
 extern char *Unix_socket_directories;
@@ -46,6 +47,11 @@ extern int	postmaster_alive_fds[2];
 
 extern PGDLLIMPORT const char *progname;
 
+extern PGDLLIMPORT void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]);
+
+struct Proxy;
+struct Port;
+
 extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
 extern void ClosePostmasterPorts(bool am_syslogger);
 extern void InitProcessGlobals(void);
@@ -63,6 +69,9 @@ extern Size ShmemBackendArraySize(void);
 extern void ShmemBackendArrayAllocation(void);
 #endif
 
+extern int  ParseStartupPacket(struct Port* port, MemoryContext memctx, void* pkg_body, int pkg_size, bool SSLdone);
+extern int	BackendStartup(struct Port* port, int* backend_pid);
+
 /*
  * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
  * for buffer references in buf_internals.h.  This limitation could be lifted
diff --git a/src/include/postmaster/proxy.h b/src/include/postmaster/proxy.h
new file mode 100644
index 0000000000..7f7a92a56a
--- /dev/null
+++ b/src/include/postmaster/proxy.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * proxy.h
+ *	  Exports from postmaster/proxy.c.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/proxy.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _PROXY_H
+#define _PROXY_H
+
+/*
+ * Information in share dmemory about connection proxy state (used for session scheduling and monitoring)
+ */
+typedef struct ConnectionProxyState
+{
+	int pid;                  /* proxy worker pid */
+	int n_clients;            /* total number of clients */
+	int n_ssl_clients;        /* number of clients using SSL connection */
+	int n_pools;              /* nubmer of dbname/role combinations */
+	int n_backends;           /* totatal number of launched backends */
+	int n_dedicated_backends; /* number of tainted backends */
+	uint64 tx_bytes;          /* amount of data sent to client */
+	uint64 rx_bytes;          /* amount of data send to server */
+	uint64 n_transactions;    /* total number of proroceeded transactions */
+} ConnectionProxyState;
+
+extern ConnectionProxyState* ProxyState;
+extern PGDLLIMPORT int MyProxyId;
+extern PGDLLIMPORT pgsocket MyProxySocket;
+
+extern int  ConnectionProxyStart(void);
+extern int  ConnectionProxyShmemSize(void);
+extern void ConnectionProxyShmemInit(void);
+#ifdef EXEC_BACKEND
+extern void ConnectionProxyMain(int argc, char *argv[]);
+#endif
+
+#endif
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11a8a..680eb5ee10 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -133,9 +133,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
+#define WL_SOCKET_EDGE       (1 << 7)
 
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
+							 WL_SOCKET_EDGE | \
 							 WL_SOCKET_CONNECTED)
 
 typedef struct WaitEvent
@@ -177,6 +179,8 @@ extern int	WaitLatch(Latch *latch, int wakeEvents, long timeout,
 extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 
+extern void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos);
+
 /*
  * Unix implementation uses SIGUSR1 for inter-process signaling.
  * Win32 doesn't need this.
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index ac7ee72952..e7207e2d9a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -203,6 +203,8 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	bool        is_tainted;            /* backend has modified session GUCs, use temporary tables, prepare statements, ... */
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index d68976fafa..9ff45b190a 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -58,6 +58,7 @@ enum config_group
 	CONN_AUTH_SETTINGS,
 	CONN_AUTH_AUTH,
 	CONN_AUTH_SSL,
+	CONN_POOLING,
 	RESOURCES,
 	RESOURCES_MEM,
 	RESOURCES_DISK,
diff --git a/src/makefiles/Makefile.cygwin b/src/makefiles/Makefile.cygwin
index f274d802b1..fdf53e9a8d 100644
--- a/src/makefiles/Makefile.cygwin
+++ b/src/makefiles/Makefile.cygwin
@@ -19,6 +19,7 @@ override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
 ifneq (,$(findstring backend,$(subdir)))
 ifeq (,$(findstring conversion_procs,$(subdir)))
 ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
 ifeq (,$(findstring replication/pgoutput,$(subdir)))
 ifeq (,$(findstring snowball,$(subdir)))
 override CPPFLAGS+= -DBUILDING_DLL
diff --git a/src/makefiles/Makefile.win32 b/src/makefiles/Makefile.win32
index 3dea11e5c2..39bd2de85e 100644
--- a/src/makefiles/Makefile.win32
+++ b/src/makefiles/Makefile.win32
@@ -17,6 +17,7 @@ CFLAGS_SL =
 ifneq (,$(findstring backend,$(subdir)))
 ifeq (,$(findstring conversion_procs,$(subdir)))
 ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
 ifeq (,$(findstring replication/pgoutput,$(subdir)))
 ifeq (,$(findstring snowball,$(subdir)))
 override CPPFLAGS+= -DBUILDING_DLL

Reply via email to