On 19.01.2018 20:28, Tomas Vondra wrote:
With pgbouncer you will never be able to use prepared statements which
slows down simple queries almost twice (unless my patch with
autoprepared statements is committed).
I don't see why that wouldn't be possible? Perhaps not for prepared
statements with simple protocol, but I'm pretty sure it's doable for
extended protocol (which seems like a reasonable limitation).
That being said, I think it's a mistake to turn this thread into a
pgbouncer vs. the world battle. I could name things that are possible
only with standalone connection pool - e.g. pausing connections and
restarting the database without interrupting the clients.
But that does not mean built-in connection pool is not useful.
regards
Sorry, I do not understand how extended protocol can help to handle
prepared statements without shared prepared statement cache or built-in
connection pooling.
The problems is that now in Postgres most of caches including catalog
cache, relation cache, prepared statements cache are private to a backend.
There is certainly one big advantage of such approach: no need to
synchronize access to the cache. But it seems to be the only advantage.
And there are a lot of drawbacks:
inefficient use of memory, complex invalidation mechanism, not
compatible with connection pooling...
So there are three possible ways (may be more, but I know only three):
1. Implement built-in connection pooling which will be aware of proper
use of local caches. This is what I have implemented with the proposed
approach.
2. Implicit autoprepare. Clients will not be able to use standard
Postgres prepare mechanism, but executor will try to generate generic
plan for ordinary queries. My implementation of this approach is at
commit fest.
3. Global caches. It seems to be the best solution but the most
difficult to implement.
Actually I think that the discussion about the value of built-in
connection pooling is very important.
Yes, external connection pooling is more flexible. It allows to perform
pooling either at client side either at server side (or even combine two
approaches).
Also external connection pooling for PostgreSQL is not limited by
pgbouncer/pgpool.
There are many frameworks maintaining their own connection pool, for
example J2EE, jboss, hibernate,...
I have a filling than about 70% of enterprise systems working with
databases are written in Java and doing connection pooling in their own way.
So may be embedded connection pooling is not needed for such applications...
But what I have heard from main people is that Postgres' poor connection
pooling is one of the main drawbacks of Postgres complicating it's usage
in enterprise environments.
In any case please find updated patch with some code cleanup and more
comments added.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..8e8a737 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,32 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+/*
+ * Drop all statements prepared in the specified session.
+ */
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..7f40edb 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,17 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_available_bytes - get number of buffered bytes available for reading.
+ *
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..2554075 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = b;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2068,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ SessionPoolSock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4058,20 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* In case of session pooling instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ /* Send connection socket to the backend pointed by BackendListClockPtr */
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin backends */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4085,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4117,24 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ /* Create socket pair for sending session sockets to the backend */
+ if (SessionPoolSize != 0)
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ SessionPoolSock = session_pipe[0]; /* Use this socket for receiving client session socket descriptor */
+ close(session_pipe[1]); /* Close unused end of the pipe */
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4176,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1]; /* Use this socket for sending client session socket descriptor */
+ close(session_pipe[0]); /* Close unused end of the pipe */
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4375,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
#endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..7dc8049 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,8 +75,18 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
+/*
+ * Information associated with client session
+ */
+typedef struct SessionContext
+{
+ MemoryContext memory; /* memory context used for global session data (replacement of TopMemoryContext) */
+ Port* port; /* connection port */
+ char* id; /* session identifier used to construct unique prepared statement names */
+} SessionContext;
/* ----------------
* global variables
@@ -98,6 +108,8 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
+/* Local socket for redirecting sessions to the backends */
+pgsocket SessionPoolSock = PGINVALID_SOCKET;
/* ----------------
@@ -169,6 +181,13 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool; /* Set of all sessions sockets */
+static int64 SessionCount; /* Number of sessions */
+static SessionContext* CurrentSession; /* Pointer to the active session */
+static Port* BackendPort; /* Reference to the original port of this backend created when this backend was launched.
+ * Session using this port may be already terminated, but since it is allocated in TopMemoryContext,
+ * its content is still valid and is used as template for ports of new sessions */
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +213,25 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return pstrdup(buf);
+}
+
+/*
+ * Free all memory associated with session and delete session object itself
+ */
+static void DeleteSession(SessionContext* session)
+{
+ elog(DEBUG1, "Delete session %p, id=%s, memory context=%p", session, session->id, session->memory);
+ MemoryContextDelete(session->memory);
+ free(session);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -1232,6 +1270,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1547,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2375,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3659,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3709,21 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session for this backend in case of session pooling */
+ if (SessionPoolSize != 0)
+ {
+ MemoryContext oldcontext;
+ CurrentSession = (SessionContext*)malloc(sizeof(SessionContext));
+ CurrentSession->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(CurrentSession->memory);
+ CurrentSession->id = CreateSessionId();
+ CurrentSession->port = MyProcPort;
+ BackendPort = MyProcPort;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3853,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4139,142 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ /*
+ * Here we perform multiplexing of client sessions if session pooling is enabled.
+ * As far as we perform transaction level pooling, rescheduling is done only when we are not in transaction.
+ */
+ if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ if (SessionPool == NULL)
+ {
+ /* Construct wait event set if not constructed yet */
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ /* Add event to detect postmaster death */
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CurrentSession);
+ /* Add event for backends latch */
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CurrentSession);
+ /* Add event for accepting new sessions */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, CurrentSession);
+ /* Add event for current session */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, CurrentSession);
+ }
+ ChooseSession:
+ DoingCommandRead = true;
+ /* Select which client session is ready to send new query */
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto ChooseSession;
+ }
+
+ if (ready_client.fd == SessionPoolSock)
+ {
+ /* Here we handle case of attaching new session */
+ int status;
+ SessionContext* session;
+ StringInfoData buf;
+ Port* port;
+ pgsocket sock;
+ MemoryContext oldcontext;
+
+ sock = pg_recv_sock(SessionPoolSock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ session = (SessionContext*)malloc(sizeof(SessionContext));
+ session->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(session->memory);
+ port = palloc(sizeof(Port));
+ memcpy(port, BackendPort, sizeof(Port));
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port->sock = sock;
+ session->port = port;
+ session->id = CreateSessionId();
+
+ MyProcPort = port;
+ status = ProcessStartupPacket(port, false, session->memory);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * TODO: Currently we assume that all sessions are accessing the same database under the same user.
+ * Just report an error if it is not true
+ */
+ if (strcmp(port->database_name, MyProcPort->database_name) ||
+ strcmp(port->user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port->database_name, port->user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port->database_name, port->user_name);
+ CurrentSession = session;
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session);
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ /*
+ * Send GUC options to the client
+ */
+ BeginReportingGUCOptions();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+
+ /* Need not flush since ReadyForQuery will do it. */
+ send_ready_for_query = true;
+ continue;
+ }
+ else
+ {
+ /* Error while processing of startup package
+ * Reject this session and return back to listening sockets
+ */
+ DeleteSession(session);
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto ChooseSession;
+ }
+ }
+ else
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ CurrentSession = (SessionContext*)ready_client.user_data;
+ MyProcPort = CurrentSession->port;
+ }
+ }
}
/*
@@ -4350,6 +4556,39 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+
+ if (SessionPool)
+ {
+ /* In case of session pooling close the session, but do not terminate the backend
+ * even if there are not more sessions in this backend.
+ * The reason for keeping backend alive is to prevent redundant process launches if
+ * some client repeatedly open/close connection to the database.
+ * Maximal number of launched backends in case of connection pooling is intended to be
+ * optimal for this system and workload, so there are no reasons to try to reduce this number
+ * when there are no active sessions.
+ */
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+ MyProcPort = NULL;
+
+ if (CurrentSession)
+ {
+ DropSessionPreparedStatements(CurrentSession->id);
+ DeleteSession(CurrentSession);
+ CurrentSession = NULL;
+ }
+ whereToSendOutput = DestRemote;
+ /* Need to perform rescheduling to some other session or accept new session */
+ goto ChooseSession;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..9202728 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,29 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by one backend if session pooling is switched on. "
+ "So maximal number of client connections is session_pool_size*max_sessions")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..191eeaa 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput;
extern PGDLLIMPORT const char *debug_query_string;
extern int max_stack_depth;
extern int PostAuthDelay;
+extern pgsocket SessionPoolSock;
/* GUC-configurable parameters */