On 26.04.2017 10:49, Konstantin Knizhnik wrote:
On 26.04.2017 04:00, Tsunakawa, Takayuki wrote: Are you considering
some upper limit on the number of prepared statements?
In this case we need some kind of LRU for maintaining cache of
autoprepared statements.
I think that it is good idea to have such limited cached - it can
avoid memory overflow problem.
I will try to implement it.
I attach new patch which allows to limit the number of autoprepared
statements (autoprepare_limit GUC variable).
Also I did more measurements, now with several concurrent connections
and read-only statements.
Results of pgbench with 10 connections, scale 10 and read-only
statements are below:
Protocol
TPS
extended
87k
prepared
209k
simple+autoprepare
206k
As you can see, autoprepare provides more than 2 times speed improvement.
Also I tried to measure overhead of parsing (to be able to substitute
all literals, not only string literals).
I just added extra call of pg_parse_query. Speed is reduced to 181k.
So overhead is noticeable, but still making such optimization useful.
This is why I want to ask question: is it better to implement slower
but safer and more universal solution?
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f6be98b..0c9abfc 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -188,6 +188,7 @@ static bool IsTransactionStmtList(List *parseTrees);
static void drop_unnamed_stmt(void);
static void SigHupHandler(SIGNAL_ARGS);
static void log_disconnections(int code, Datum arg);
+static bool exec_cached_query(const char *query_string);
/* ----------------------------------------------------------------
@@ -916,6 +917,14 @@ exec_simple_query(const char *query_string)
drop_unnamed_stmt();
/*
+ * Try to find cached plan
+ */
+ if (autoprepare_threshold != 0 && exec_cached_query(query_string))
+ {
+ return;
+ }
+
+ /*
* Switch to appropriate context for constructing parsetrees.
*/
oldcontext = MemoryContextSwitchTo(MessageContext);
@@ -4500,3 +4509,606 @@ log_disconnections(int code, Datum arg)
port->user_name, port->database_name, port->remote_host,
port->remote_port[0] ? " port=" : "", port->remote_port)));
}
+
+typedef struct {
+ char const* query;
+ dlist_node lru;
+ int64 exec_count;
+ CachedPlanSource* plan;
+ int n_params;
+ int16 format;
+ bool disable_autoprepare;
+} plan_cache_entry;
+
+/*
+ * Replace string literals with parameters. We do not consider integer or real literals to avoid problems with
+ * negative number, user defined operators, ... For example it is not easy to distinguish cases (-1), (1-1), (1-1)-1
+ */
+static void generalize_statement(const char *query_string, char** gen_query, char** query_params, int* n_params)
+{
+ size_t query_len = strlen(query_string);
+ char const* src = query_string;
+ char* dst;
+ char* params;
+ unsigned char ch;
+
+ *n_params = 0;
+
+ *gen_query = (char*)palloc(query_len*2); /* assume that we have less than 1000 parameters, the worst case is replacing '' with $999 */
+ *query_params = (char*)palloc(query_len + 1);
+ dst = *gen_query;
+ params = *query_params;
+
+ while ((ch = *src++) != '\0') {
+ if (isspace(ch)) {
+ /* Replace sequence of whitespaces with just one space */
+ while (*src && isspace(*(unsigned char*)src)) {
+ src += 1;
+ }
+ *dst++ = ' ';
+ } else if (ch == '\'') {
+ while (true) {
+ ch = *src++;
+ if (ch == '\'') {
+ if (*src != '\'') {
+ break;
+ } else {
+ /* escaped quote */
+ *params++ = '\'';
+ src += 1;
+ }
+ } else {
+ *params++ = ch;
+ }
+ }
+ *params++ = '\0';
+ dst += sprintf(dst, "$%d", ++*n_params);
+ } else {
+ *dst++ = ch;
+ }
+ }
+ Assert(dst <= *gen_query + query_len);
+ Assert(params <= *query_params + query_len*2);
+ *dst = '\0';
+}
+
+static uint32 plan_cache_hash_fn(const void *key, Size keysize)
+{
+ return string_hash(((plan_cache_entry*)key)->query, 0);
+}
+
+static int plan_cache_match_fn(const void *key1, const void *key2, Size keysize)
+{
+ return strcmp(((plan_cache_entry*)key1)->query, ((plan_cache_entry*)key2)->query);
+}
+
+static void* plan_cache_keycopy_fn(void *dest, const void *src, Size keysize)
+{
+ ((plan_cache_entry*)dest)->query = pstrdup(((plan_cache_entry*)src)->query);
+ return dest;
+}
+
+#define PLAN_CACHE_SIZE 113
+
+size_t nPlanCacheHits;
+size_t nPlanCacheMisses;
+
+/*
+ * Try to generalize query, find cached plan for it and execute
+ */
+static bool exec_cached_query(const char *query_string)
+{
+ CommandDest dest = whereToSendOutput;
+ DestReceiver *receiver;
+ char *gen_query;
+ char *query_params;
+ int n_params;
+ plan_cache_entry *entry;
+ bool found;
+ MemoryContext old_context;
+ CachedPlanSource *psrc;
+ ParamListInfo params;
+ int paramno;
+ CachedPlan *cplan;
+ Portal portal;
+ bool was_logged = false;
+ bool is_xact_command;
+ bool execute_is_fetch;
+ char completion_tag[COMPLETION_TAG_BUFSIZE];
+ bool save_log_statement_stats = log_statement_stats;
+ ParamListInfo portal_params;
+ const char *source_text;
+ char msec_str[32];
+ bool snapshot_set = false;
+
+ static HTAB* plan_cache;
+ static dlist_head lru;
+ static size_t n_cached_queries;
+ static MemoryContext plan_cache_context;
+
+ /*
+ * Extract literals from query
+ */
+ generalize_statement(query_string, &gen_query, &query_params, &n_params);
+
+ if (plan_cache_context == NULL) {
+ plan_cache_context = AllocSetContextCreate(TopMemoryContext,
+ "plan cache context",
+ ALLOCSET_DEFAULT_SIZES);
+ }
+ old_context = MemoryContextSwitchTo(plan_cache_context);
+
+ /*
+ * Initialize hash table if not initialized yet
+ */
+ if (plan_cache == NULL)
+ {
+ static HASHCTL info;
+ info.keysize = sizeof(char*);
+ info.entrysize = sizeof(plan_cache_entry);
+ info.hash = plan_cache_hash_fn;
+ info.match = plan_cache_match_fn;
+ info.keycopy = plan_cache_keycopy_fn;
+ plan_cache = hash_create("plan_cache", autoprepare_limit != 0 ? autoprepare_limit : PLAN_CACHE_SIZE,
+ &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+ dlist_init(&lru);
+ }
+
+ /*
+ * Lookup generalized query
+ */
+ entry = (plan_cache_entry*)hash_search(plan_cache, &gen_query, HASH_ENTER, &found);
+ if (!found) {
+ if (++n_cached_queries > autoprepare_limit && autoprepare_limit != 0) {
+ plan_cache_entry* victim = dlist_container(plan_cache_entry, lru, lru.head.prev);
+ DropCachedPlan(victim->plan);
+ hash_search(plan_cache, victim, HASH_REMOVE, NULL);
+ n_cached_queries -= 1;
+ }
+ entry->exec_count = 0;
+ entry->plan = NULL;
+ entry->disable_autoprepare = false;
+ } else {
+ dlist_delete(&entry->lru);
+ if (entry->plan != NULL && !entry->plan->is_valid) {
+ DropCachedPlan(entry->plan);
+ entry->plan = NULL;
+ }
+ }
+ dlist_insert_after(&lru.head, &entry->lru);
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * Prepare query only when it is executed more than autoprepare_threshold times
+ */
+ if (entry->disable_autoprepare || entry->exec_count++ < autoprepare_threshold) {
+ nPlanCacheMisses += 1;
+ return false;
+ }
+ if (entry->plan == NULL) {
+ List *parsetree_list;
+ Node *raw_parse_tree;
+ const char *command_tag;
+ Query *query;
+ List *querytree_list;
+ Oid *param_types = NULL;
+ int num_params = 0;
+
+ old_context = MemoryContextSwitchTo(MessageContext);
+
+ PG_TRY();
+ {
+ parsetree_list = pg_parse_query(gen_query);
+ query_string = gen_query;
+ }
+ PG_CATCH();
+ {
+ elog(LOG, "Failed to autoprepare query \"%s\"", gen_query);
+ FlushErrorState();
+ MemoryContextSwitchTo(old_context);
+ entry->disable_autoprepare = true;
+ nPlanCacheMisses += 1;
+ return false;
+ }
+ PG_END_TRY();
+
+ /*
+ * Only single user statement are allowed in a prepared statement.
+ */
+ if (list_length(parsetree_list) != 1) {
+ MemoryContextSwitchTo(old_context);
+ entry->disable_autoprepare = true;
+ nPlanCacheMisses += 1;
+ return false;
+ }
+
+ raw_parse_tree = (Node *) linitial(parsetree_list);
+
+ /*
+ * Get the command name for possible use in status display.
+ */
+ command_tag = CreateCommandTag(raw_parse_tree);
+
+ /*
+ * If we are in an aborted transaction, reject all commands except
+ * COMMIT/ROLLBACK. It is important that this test occur before we
+ * try to do parse analysis, rewrite, or planning, since all those
+ * phases try to do database accesses, which may fail in abort state.
+ * (It might be safe to allow some additional utility commands in this
+ * state, but not many...)
+ */
+ if (IsAbortedTransactionBlockState() &&
+ !IsTransactionExitStmt(raw_parse_tree))
+ ereport(ERROR,
+ (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+ errmsg("current transaction is aborted, "
+ "commands ignored until end of transaction block"),
+ errdetail_abort()));
+
+ /*
+ * Create the CachedPlanSource before we do parse analysis, since it
+ * needs to see the unmodified raw parse tree.
+ */
+ psrc = CreateCachedPlan(raw_parse_tree, query_string, command_tag);
+
+ /*
+ * Set up a snapshot if parse analysis will need one.
+ */
+ if (analyze_requires_snapshot(raw_parse_tree))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ /*
+ * Analyze and rewrite the query. Note that the originally specified
+ * parameter set is not required to be complete, so we have to use
+ * parse_analyze_varparams().
+ */
+ if (log_parser_stats) {
+ ResetUsage();
+ }
+
+ query = parse_analyze_varparams(raw_parse_tree,
+ query_string,
+ ¶m_types,
+ &num_params);
+ Assert(num_params == n_params);
+
+ /*
+ * Check all parameter types got determined.
+ */
+ for (paramno = 0; paramno < n_params; paramno++)
+ {
+ Oid ptype = param_types[paramno];
+
+ if (ptype == InvalidOid || ptype == UNKNOWNOID) {
+ /* Type of parameter can not be determined */
+ MemoryContextSwitchTo(old_context);
+ entry->disable_autoprepare = true;
+ nPlanCacheMisses += 1;
+ return false;
+ }
+ }
+
+ if (log_parser_stats) {
+ ShowUsage("PARSE ANALYSIS STATISTICS");
+ }
+
+ querytree_list = pg_rewrite_query(query);
+
+ /* Done with the snapshot used for parsing */
+ if (snapshot_set) {
+ PopActiveSnapshot();
+ }
+
+ CompleteCachedPlan(psrc,
+ querytree_list,
+ NULL,
+ param_types,
+ n_params,
+ NULL,
+ NULL,
+ CURSOR_OPT_PARALLEL_OK, /* allow parallel mode */
+ true); /* fixed result */
+
+ /* If we got a cancel signal during analysis, quit */
+ CHECK_FOR_INTERRUPTS();
+
+ entry->format = 0; /* TEXT is default */
+ if (IsA(raw_parse_tree, FetchStmt))
+ {
+ FetchStmt *stmt = (FetchStmt *)raw_parse_tree;
+
+ if (!stmt->ismove)
+ {
+ Portal fportal = GetPortalByName(stmt->portalname);
+
+ if (PortalIsValid(fportal) &&
+ (fportal->cursorOptions & CURSOR_OPT_BINARY))
+ entry->format = 1; /* BINARY */
+ }
+ }
+
+ SaveCachedPlan(psrc);
+ entry->plan = psrc;
+ entry->n_params = n_params;
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * We do NOT close the open transaction command here; that only happens
+ * when the client sends Sync. Instead, do CommandCounterIncrement just
+ * in case something happened during parse/plan.
+ */
+ CommandCounterIncrement();
+ } else {
+ psrc = entry->plan;
+ n_params = entry->n_params;
+ }
+
+ /*
+ * If we are in aborted transaction state, the only portals we can
+ * actually run are those containing COMMIT or ROLLBACK commands. We
+ * disallow binding anything else to avoid problems with infrastructure
+ * that expects to run inside a valid transaction. We also disallow
+ * binding any parameters, since we can't risk calling user-defined I/O
+ * functions.
+ */
+ if (IsAbortedTransactionBlockState() &&
+ (!IsTransactionExitStmt(psrc->raw_parse_tree) ||
+ n_params != 0))
+ ereport(ERROR,
+ (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+ errmsg("current transaction is aborted, "
+ "commands ignored until end of transaction block"),
+ errdetail_abort()));
+
+ /*
+ * Create unnamed portal to run the query or queries in. If there
+ * already is one, silently drop it.
+ */
+ portal = CreatePortal("", true, true);
+ /* Don't display the portal in pg_cursors */
+ portal->visible = false;
+
+ /*
+ * Prepare to copy stuff into the portal's memory context. We do all this
+ * copying first, because it could possibly fail (out-of-memory) and we
+ * don't want a failure to occur between GetCachedPlan and
+ * PortalDefineQuery; that would result in leaking our plancache refcount.
+ */
+ old_context = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+
+ /* Copy the plan's query string into the portal */
+ query_string = pstrdup(psrc->query_string);
+
+ /*
+ * Set a snapshot if we have parameters to fetch (since the input
+ * functions might need it) or the query isn't a utility command (and
+ * hence could require redoing parse analysis and planning). We keep the
+ * snapshot active till we're done, so that plancache.c doesn't have to
+ * take new ones.
+ */
+ if (n_params > 0 ||
+ (psrc->raw_parse_tree &&
+ analyze_requires_snapshot(psrc->raw_parse_tree)))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ } else {
+ snapshot_set = false;
+ }
+
+ /*
+ * Fetch parameters, if any, and store in the portal's memory context.
+ */
+ if (n_params > 0)
+ {
+ params = (ParamListInfo) palloc(offsetof(ParamListInfoData, params) +
+ n_params * sizeof(ParamExternData));
+ params->paramFetch = NULL;
+ params->paramFetchArg = NULL;
+ params->parserSetup = NULL;
+ params->parserSetupArg = NULL;
+ params->numParams = n_params;
+ params->paramMask = NULL;
+
+ for (paramno = 0; paramno < n_params; paramno++)
+ {
+ Oid ptype = psrc->param_types[paramno];
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(ptype, &typinput, &typioparam);
+
+ params->params[paramno].value = OidInputFunctionCall(typinput, query_params, typioparam, -1);
+ params->params[paramno].isnull = false;
+
+ /*
+ * We mark the params as CONST. This ensures that any custom plan
+ * makes full use of the parameter values.
+ */
+ params->params[paramno].pflags = PARAM_FLAG_CONST;
+ params->params[paramno].ptype = ptype;
+
+ query_params += strlen(query_params) + 1;
+ }
+ } else {
+ params = NULL;
+ }
+
+ /* Done storing stuff in portal's context */
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * Obtain a plan from the CachedPlanSource. Any cruft from (re)planning
+ * will be generated in MessageContext. The plan refcount will be
+ * assigned to the Portal, so it will be released at portal destruction.
+ */
+ cplan = GetCachedPlan(psrc, params, false);
+
+ /*
+ * Now we can define the portal.
+ *
+ * DO NOT put any code that could possibly throw an error between the
+ * above GetCachedPlan call and here.
+ */
+ PortalDefineQuery(portal,
+ NULL,
+ query_string,
+ psrc->commandTag,
+ cplan->stmt_list,
+ cplan);
+
+ /* Done with the snapshot used for parameter I/O and parsing/planning */
+ if (snapshot_set) {
+ PopActiveSnapshot();
+ }
+
+ /*
+ * And we're ready to start portal execution.
+ */
+ PortalStart(portal, params, 0, InvalidSnapshot);
+
+ /*
+ * Apply the result format requests to the portal.
+ */
+ PortalSetResultFormat(portal, 1, &entry->format);
+
+ /* Does the portal contain a transaction command? */
+ is_xact_command = IsTransactionStmtList(portal->stmts);
+
+ /*
+ * We must copy the sourceText into MessageContext in
+ * case the portal is destroyed during finish_xact_command. Can avoid the
+ * copy if it's not an xact command, though.
+ */
+ if (is_xact_command)
+ {
+ source_text = pstrdup(portal->sourceText);
+ /*
+ * An xact command shouldn't have any parameters, which is a good
+ * thing because they wouldn't be around after finish_xact_command.
+ */
+ portal_params = NULL;
+ }
+ else
+ {
+ source_text = portal->sourceText;
+ portal_params = portal->portalParams;
+ }
+
+ /*
+ * Report query to various monitoring facilities.
+ */
+ debug_query_string = source_text;
+
+ pgstat_report_activity(STATE_RUNNING, source_text);
+
+ set_ps_display(portal->commandTag, false);
+
+ if (save_log_statement_stats) {
+ ResetUsage();
+ }
+
+ BeginCommand(portal->commandTag, dest);
+
+ PortalSetResultFormat(portal, 1, &entry->format);
+
+
+ /*
+ * Create dest receiver in MessageContext (we don't want it in transaction
+ * context, because that may get deleted if portal contains VACUUM).
+ */
+ receiver = CreateDestReceiver(dest);
+ SetRemoteDestReceiverParams(receiver, portal);
+
+ /*
+ * If we re-issue an Execute protocol request against an existing portal,
+ * then we are only fetching more rows rather than completely re-executing
+ * the query from the start. atStart is never reset for a v3 portal, so we
+ * are safe to use this check.
+ */
+ execute_is_fetch = !portal->atStart;
+
+ /* Log immediately if dictated by log_statement */
+ if (check_log_statement(portal->stmts))
+ {
+ ereport(LOG,
+ (errmsg("%s %s%s%s: %s",
+ execute_is_fetch ?
+ _("execute fetch from") :
+ _("execute"),
+ "<unnamed>",
+ "",
+ "",
+ source_text),
+ errhidestmt(true),
+ errdetail_params(portal_params)));
+ was_logged = true;
+ }
+
+ /* Check for cancel signal before we start execution */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Run the portal to completion, and then drop it (and the receiver).
+ */
+ (void) PortalRun(portal,
+ FETCH_ALL,
+ true,
+ receiver,
+ receiver,
+ completion_tag);
+
+ (*receiver->rDestroy) (receiver);
+
+ PortalDrop(portal, false);
+
+ /*
+ * Tell client that we're done with this query. Note we emit exactly
+ * one EndCommand report for each raw parsetree, thus one for each SQL
+ * command the client sent, regardless of rewriting. (But a command
+ * aborted by error will not send an EndCommand report at all.)
+ */
+ EndCommand(completion_tag, dest);
+
+ /*
+ * Close down transaction statement, if one is open.
+ */
+ finish_xact_command();
+
+ /*
+ * Emit duration logging if appropriate.
+ */
+ switch (check_log_duration(msec_str, was_logged))
+ {
+ case 1:
+ ereport(LOG,
+ (errmsg("duration: %s ms", msec_str),
+ errhidestmt(true)));
+ break;
+ case 2:
+ ereport(LOG,
+ (errmsg("duration: %s ms %s %s%s%s: %s",
+ msec_str,
+ execute_is_fetch ?
+ _("execute fetch from") :
+ _("execute"),
+ "<unnamed>",
+ "",
+ "",
+ source_text),
+ errhidestmt(true),
+ errdetail_params(portal_params)));
+ break;
+ }
+
+ if (save_log_statement_stats) {
+ ShowUsage("EXECUTE MESSAGE STATISTICS");
+ }
+ debug_query_string = NULL;
+ nPlanCacheHits += 1;
+
+ return true;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4f1891f..4bb5b65 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -450,6 +450,10 @@ int tcp_keepalives_idle;
int tcp_keepalives_interval;
int tcp_keepalives_count;
+
+int autoprepare_threshold;
+int autoprepare_limit;
+
/*
* SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it
* being set to zero (meaning never renegotiate) for backward compatibility.
@@ -1949,6 +1953,28 @@ static struct config_int ConfigureNamesInt[] =
check_max_stack_depth, assign_max_stack_depth, NULL
},
+ /*
+ * Threshold for implicit preparing of frequently executed queries
+ */
+ {
+ {"autoprepare_threshold", PGC_USERSET, QUERY_TUNING_OTHER,
+ gettext_noop("Threshold for autopreparing query."),
+ NULL,
+ },
+ &autoprepare_threshold,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+ {
+ {"autoprepare_limit", PGC_USERSET, QUERY_TUNING_OTHER,
+ gettext_noop("Maximal number of autoprepared queries."),
+ NULL,
+ },
+ &autoprepare_threshold,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
{
{"temp_file_limit", PGC_SUSET, RESOURCES_DISK,
gettext_noop("Limits the total size of all temporary files used by each process."),
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0bf9f21..f035ce7 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -252,6 +252,9 @@ extern int client_min_messages;
extern int log_min_duration_statement;
extern int log_temp_files;
+extern int autoprepare_threshold;
+extern int autoprepare_limit;
+
extern int temp_file_limit;
extern int num_temp_buffers;
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers