On Tuesday, 7th of december 2010, 21:57:00 Wietse Venema wrote:
> Wietse Venema:
> > Thanks for the patch.
> > 
> > Stefan Jakobs:
> > > I'am not aware of any dead-lock issues. The sequence pseudo-thread
> > > will query the database only once with the first key. For every
> > > next key the sequence pseudo-thread is working with the results
> > > in the memory. With a very large database the size of the response
> > > may be a problem. But a INSERT/DELETE/UPDATE operation will not
> > > conflict with the sequence pseudo-thread.  Finally, I can not
> > > prove if the code is dead-lock safe.
> > 
> > It appears that this sequence() implementation uses memory in
> > proportion to the database size. That is not acceptable. Would it
> > be possible to maintain state with a limited amount of memory for
> > a database cursor?
> > 
> > By design, Postfix memory usage must not keep growing with increasing
> > data size, queue size, message size, database size etc.  This is
> > necessary to ensure sane handling of overload. It is not acceptable
> > that Postfix becomes deadlocked under overload.
> 
> Looking over MySQL documentation, there appears to be a HANDLER
> primitive that appears to support FIRST/NEXT sequential access
> without using an amount of memory that grows with database size.
> 
> http://dev.mysql.com/doc/refman/5.5/en/handler.html
> http://dev.mysql.com/doc/refman/4.1/en/handler.html
> 
> This approach seems to have similar consistency limitations as
> other Postfix maps that support FIRST/NEXT sequential access while
> database updates are happening, and that is not a problem.  When
> we use FIRST/NEXT for database cleanup, it is sufficient if we
> clean up most of the stale entries.

Great hint. I wasn't aware of the HANDLER primitive. I would have used 
mysql_use_result() but it would have needed its own connection to the server 
and would have made the code more complicated.
I changed my code and it uses now the handler primitive. You will find the new 
patch against Postfix 2.7.1 in the appendix. The advantage is that the 
sequence() implemantation fetches only one data set (tuple) per query, so the 
used memory doesn't grow with the database size. A drawback is that this 
solution is not as configurable/flexible as the other one. And it's still the 
case that the first two values of a fetched tuple must be the address and its 
corresponing cache timings (data). But I guess that is acceptable.
I'm not aware of any deadlock issues. But again, I can not prove it.

With this new patch a configuration for using the verify db with mysql looks 
like this:
/etc/postfix/verify.cf:
user = postfix
password = <secret>
dbname = postfix
cache_tblname  = verify
query = SELECT data FROM verify WHERE address='%s'
delete = DELETE FROM verify WHERE address='%s'
insert = INSERT verify SET address='%s', data='%v'
update = UPDATE verify SET data='%v' WHERE address='%s'

>       Wietse

Thank you for your help and suggestions.
Kind regards
Stefan
diff -ur postfix-2.7.1.orig/man/man5/mysql_table.5 postfix-2.7.1/man/man5/mysql_table.5
--- postfix-2.7.1.orig/man/man5/mysql_table.5	2008-07-21 13:50:13.000000000 +0200
+++ postfix-2.7.1/man/man5/mysql_table.5	2010-12-11 20:39:20.000000000 +0100
@@ -126,6 +126,15 @@
 .nf
     dbname = customer_database
 .fi
+.IP "\fBcache_tblname\fR"
+The name of the cache table. Postfix will perform cache cleanups
+on this table. Example:
+.nf
+    cache_tblname = verify 
+.fi
+.IP
+This parameter is available with Postfix MySQL write support patch.
+
 .IP "\fBquery\fR"
 The SQL query template used to search the database, where \fB%s\fR
 is a substitute for the address Postfix is trying to resolve,
@@ -191,6 +200,139 @@
 parameter is not specified.
 
 NOTE: DO NOT put quotes around the query parameter.
+.IP "\fBinsert\fR"
+The SQL insert template used to insert a input key, value pair into the
+database, where \fB%s\fR is a substitute for the input key and \fB%v\fR
+is a substitute for the value, e.g.
+.nf
+    insert = INSERT verify SET address='%s', data='%v'
+.fi
+
+This parameter supports the following '%' expansions:
+.RS
+.IP "\fB\fB%%\fR\fR"
+This is replaced by a literal '%' character.
+.IP "\fB\fB%s\fR\fR"
+This is replaced by the input key.
+SQL quoting is used to make sure that the input key does not
+add unexpected metacharacters.
+.IP "\fB\fB%v\fR\fR"
+This is replaced by the corresponding value.
+SQL quoting is used to make sure that the value does not
+add unexpected metacharacters.
+.IP "\fB\fB%u\fR\fR"
+When the input key is an address of the form u...@domain, \fB%u\fR
+is replaced by the SQL quoted local part of the address.
+Otherwise, \fB%u\fR is replaced by the entire search string.
+If the localpart is empty, the insert is suppressed and returns
+no results.
+.IP "\fB\fB%d\fR\fR"
+When the input key is an address of the form u...@domain, \fB%d\fR
+is replaced by the SQL quoted domain part of the address.
+Otherwise, the insert is suppressed and returns no results.
+.IP "\fB\fB%[SVUD]\fR\fR"
+The upper-case equivalents of the above expansions behave in the
+\fBinsert\fR parameter identically to their lower-case counter-parts.
+.IP "\fB\fB%[1-9]\fR\fR"
+The patterns %1, %2, ... %9 are replaced by the corresponding
+most significant component of the input key's domain. If the
+input key is \fiu...@mail.example.com\fr, then %1 is \fBcom\fR,
+%2 is \fBexample\fR and %3 is \fBmail\fR. If the input key is
+unqualified or does not have enough domain components to satisfy
+all the specified patterns, the insert is suppressed and returns
+no results.
+.RE
+.IP
+This parameter is available with Postfix MySQL write support patch.
+
+NOTE: DO NOT put quotes around the query parameter.
+.IP "\fBupdate\fR"
+The SQL update template used to update a input key, value pair in the
+database, where \fB%s\fR is a substitute for the input key and \fB%v\fR
+is a substitute for the value, e.g.
+.nf
+    update = UPDATE verify SET data='%v' WHERE address='%s'
+.fi
+
+This parameter supports the following '%' expansions:
+.RS
+.IP "\fB\fB%%\fR\fR"
+This is replaced by a literal '%' character.
+.IP "\fB\fB%s\fR\fR"
+This is replaced by the input key.
+SQL quoting is used to make sure that the input key does not
+add unexpected metacharacters.
+.IP "\fB\fB%v\fR\fR"
+This is replaced by the corresponding value.
+SQL quoting is used to make sure that the value does not
+add unexpected metacharacters.
+.IP "\fB\fB%u\fR\fR"
+When the input key is an address of the form u...@domain, \fB%u\fR
+is replaced by the SQL quoted local part of the address.
+Otherwise, \fB%u\fR is replaced by the entire search string.
+If the localpart is empty, the update is suppressed and returns
+no results.
+.IP "\fB\fB%d\fR\fR"
+When the input key is an address of the form u...@domain, \fB%d\fR
+is replaced by the SQL quoted domain part of the address.
+Otherwise, the update is suppressed and returns no results.
+.IP "\fB\fB%[SVUD]\fR\fR"
+The upper-case equivalents of the above expansions behave in the
+\fBupdate\fR parameter identically to their lower-case counter-parts.
+.IP "\fB\fB%[1-9]\fR\fR"
+The patterns %1, %2, ... %9 are replaced by the corresponding
+most significant component of the input key's domain. If the
+input key is \fiu...@mail.example.com\fr, then %1 is \fBcom\fR,
+%2 is \fBexample\fR and %3 is \fBmail\fR. If the input key is
+unqualified or does not have enough domain components to satisfy
+all the specified patterns, the update is suppressed and returns
+no results.
+.RE
+.IP
+This parameter is available with Postfix MySQL write support patch.
+
+NOTE: DO NOT put quotes around the update parameter.
+.IP "\fBdelete\fR"
+The SQL delete template used to delete a input key from the database,
+where \fB%s\fR is a substitute for the input key, e.g.
+.nf
+    delete = DELETE FROM verify WHERE address='%s'
+.fi
+
+This parameter supports the following '%' expansions:
+.RS
+.IP "\fB\fB%%\fR\fR"
+This is replaced by a literal '%' character.
+.IP "\fB\fB%s\fR\fR"
+This is replaced by the input key.
+SQL quoting is used to make sure that the input key does not
+add unexpected metacharacters.
+.IP "\fB\fB%u\fR\fR"
+When the input key is an address of the form u...@domain, \fB%u\fR
+is replaced by the SQL quoted local part of the address.
+Otherwise, \fB%u\fR is replaced by the entire search string.
+If the localpart is empty, the delete is suppressed and returns
+no results.
+.IP "\fB\fB%d\fR\fR"
+When the input key is an address of the form u...@domain, \fB%d\fR
+is replaced by the SQL quoted domain part of the address.
+Otherwise, the delete is suppressed and returns no results.
+.IP "\fB\fB%[SVUD]\fR\fR"
+The upper-case equivalents of the above expansions behave in the
+\fBdelete\fR parameter identically to their lower-case counter-parts.
+.IP "\fB\fB%[1-9]\fR\fR"
+The patterns %1, %2, ... %9 are replaced by the corresponding
+most significant component of the input key's domain. If the
+input key is \fiu...@mail.example.com\fr, then %1 is \fBcom\fR,
+%2 is \fBexample\fR and %3 is \fBmail\fR. If the input key is
+unqualified or does not have enough domain components to satisfy
+all the specified patterns, the delete is suppressed and returns
+no results.
+.RE
+.IP
+This parameter is available with Postfix MySQL write support patch.
+
+NOTE: DO NOT put quotes around the delete parameter.
 .IP "\fBresult_format (default: \fB%s\fR)\fR"
 Format template applied to result attributes. Most commonly used
 to append (or prepend) text to the result. This parameter supports
@@ -343,3 +485,6 @@
 Institute of Mathematics of the Romanian Academy
 P.O. BOX 1-764
 RO-014700 Bucharest, ROMANIA
+
+MySQL write support by:
+Stefan Jakobs
diff -ur postfix-2.7.1.orig/src/global/db_common.c postfix-2.7.1/src/global/db_common.c
--- postfix-2.7.1.orig/src/global/db_common.c	2009-10-05 22:33:16.000000000 +0200
+++ postfix-2.7.1/src/global/db_common.c	2010-10-21 17:48:32.000000000 +0200
@@ -23,6 +23,15 @@
 /*	VSTRING	*buf;
 /*	void	(*quote_func)(DICT *, const char *, VSTRING *);
 /*
+/*	int	db_common_expand2(ctx, format, value, data, key, buf, quote_func);
+/*	void	*ctx;
+/*	const char *format;
+/*	const char *value;
+/*	const char *data;
+/*	const char *key;
+/*	VSTRING	*buf;
+/*	void	(*quote_func)(DICT *, const char *, VSTRING *);
+/*
 /*	int	db_common_check_domain(domain_list, addr);
 /*	STRING_LIST *domain_list;
 /*	const char *addr;
@@ -66,6 +75,8 @@
 /*	A literal percent character.
 /* .IP %s
 /*	The entire lookup key \fIaddr\fR.
+/* .IP %v
+/*	The corresponding data to the lookup key \fIaddr\fR.
 /* .IP %u
 /*	If \fBaddr\fR is a fully qualified address, the local part of the
 /*	address.  Otherwise \fIaddr\fR.
@@ -77,6 +88,8 @@
 /*	The following '%' expansions are performed on the lookup \fBkey\fR:
 /* .IP %S
 /*	The entire lookup key \fIkey\fR.
+/* .IP %V
+/*	The corresponding data to the lookup key \fIaddr\fR.
 /* .IP %U
 /*	If \fBkey\fR is a fully qualified address, the local part of the
 /*	address.  Otherwise \fIkey\fR.
@@ -286,6 +299,15 @@
 			         const char *key, VSTRING *result,
 			         db_quote_callback_t quote_func)
 {
+    return db_common_expand2(ctxArg, format, value, NULL, key, result, quote_func);
+}
+
+/* db_common_expand2 - expand query and result templates */
+
+int     db_common_expand2(void *ctxArg, const char *format, const char *value,
+			         const char *data, const char *key, VSTRING *result,
+			         db_quote_callback_t quote_func)
+{
     const char *myname = "db_common_expand";
     DB_COMMON_CTX *ctx = (DB_COMMON_CTX *) ctxArg;
     const char *vdomain = 0;
@@ -382,7 +404,8 @@
     } while (0)
 
     /*
-     * Replace all instances of %s with the address to look up. Replace %u
+     * Replace all instances of %s with the address to look up. Replace %v
+     * with the data value portion. Replace %u
      * with the user portion, and %d with the domain portion. "%%" expands to
      * "%".  lowercase -> addr, uppercase -> key
      */
@@ -398,6 +421,16 @@
 		QUOTE_VAL(ctx->dict, quote_func, value, result);
 		break;
 
+	    case 'v':
+                /* Don't silenty skip empty query string */
+                if (*data == 0) {
+	            msg_warn("table \"%s:%s\": empty query string"
+		             " -- ignored", ctx->dict->type, ctx->dict->name);
+	            return (0);
+                }
+		QUOTE_VAL(ctx->dict, quote_func, data, result);
+		break;
+
 	    case 'u':
 		if (vdomain) {
 		    if (vuser == 0)
@@ -424,6 +457,18 @@
 		    QUOTE_VAL(ctx->dict, quote_func, value, result);
 		break;
 
+            case 'V':
+                if (! key) {
+                    /* Don't silenty skip empty query string */
+                    if (*data == 0) {
+	                msg_warn("table \"%s:%s\": empty query string"
+		                 " -- ignored", ctx->dict->type, ctx->dict->name);
+        	            return (0);
+                    }
+                    QUOTE_VAL(ctx->dict, quote_func, data, result);
+                }
+                break;
+
 	    case 'U':
 		if (key) {
 		    if (kdomain) {
diff -ur postfix-2.7.1.orig/src/global/db_common.h postfix-2.7.1/src/global/db_common.h
--- postfix-2.7.1.orig/src/global/db_common.h	2005-09-23 01:50:50.000000000 +0200
+++ postfix-2.7.1/src/global/db_common.h	2010-10-21 17:47:24.000000000 +0200
@@ -25,6 +25,8 @@
 extern int db_common_dict_partial(void *);
 extern int db_common_expand(void *, const char *, const char *,
 			    const char *, VSTRING *, db_quote_callback_t);
+extern int db_common_expand2(void *, const char *, const char *, const char *,
+			    const char *, VSTRING *, db_quote_callback_t);
 extern int db_common_check_domain(void *, const char *);
 extern void db_common_free_ctx(void *);
 extern void db_common_sql_build_query(VSTRING *query, CFG_PARSER *parser);
diff -ur postfix-2.7.1.orig/src/global/dict_mysql.c postfix-2.7.1/src/global/dict_mysql.c
--- postfix-2.7.1.orig/src/global/dict_mysql.c	2007-01-04 21:07:38.000000000 +0100
+++ postfix-2.7.1/src/global/dict_mysql.c	2010-12-14 11:58:54.000000000 +0100
@@ -42,7 +42,7 @@
 /* .IP other_name
 /*	reference for outside use.
 /* .IP open_flags
-/*	Must be O_RDONLY.
+/*	See open(2). Must be O_RDWR for write access.
 /* .IP dict_flags
 /*	See dict_open(3).
 /* .PP
@@ -57,6 +57,8 @@
 /*	Password for the above.
 /* .IP \fIdbname\fR
 /*	Name of the database.
+/* .IP \fIcache_tblname\fR
+/*	Name of the cache table. Used for automatic table cleanups.
 /* .IP \fIdomain\fR
 /*      List of domains the queries should be restricted to.  If
 /*      specified, only FQDN addresses whose domain parts matching this
@@ -69,6 +71,18 @@
 /*	No query is specified, the legacy variables \fItable\fR,
 /*	\fIselect_field\fR, \fIwhere_field\fR and \fIadditional_conditions\fR
 /*	are used to construct the query template.
+/* .IP \fIinsert\fR
+/*      Insert template, before the query is actually issued, variable
+/*	substitutions are performed. See mysql_table(5) for details. 
+/*	Legacy variables are not available for this query.
+/* .IP \fIupdate\fR
+/*      Update template, before the query is actually issued, variable
+/*	substitutions are performed. See mysql_table(5) for details. 
+/*	Legacy variables are not available for this query.
+/* .IP \fIdelete\fR
+/*      Delete template, before the query is actually issued, variable
+/*	substitutions are performed. See mysql_table(5) for details. 
+/*	Legacy variables are not available for this query.
 /* .IP \fIresult_format\fR
 /*      The format used to expand results from queries.  Substitutions
 /*      are performed as described in mysql_table(5). Defaults to returning
@@ -220,14 +234,20 @@
     DICT    dict;
     CFG_PARSER *parser;
     char   *query;
+    char   *insert;
+    char   *update;
+    char   *delete;
     char   *result_format;
     void   *ctx;
     int     expansion_limit;
     char   *username;
     char   *password;
     char   *dbname;
+    char   *cache_tblname;
     ARGV   *hosts;
     PLMYSQL *pldb;
+    MYSQL_RES  *cursor;			/* stores the actual position in the 
+					 * sequence process. */
 #if defined(MYSQL_VERSION_ID) && MYSQL_VERSION_ID >= 40000
     HOST   *active_host;
 #endif
@@ -248,6 +268,10 @@
 static PLMYSQL *plmysql_init(ARGV *);
 static MYSQL_RES *plmysql_query(DICT_MYSQL *, const char *, VSTRING *, char *,
 				char *, char *);
+static int plmysql_update(DICT_MYSQL *, const char *, const char *,
+			 const char *,VSTRING *, char *, char *, char *);
+static MYSQL_RES *plmysql_sequence(DICT_MYSQL *, int, VSTRING *, char *,
+			 	char *, char *);
 static void plmysql_dealloc(PLMYSQL *);
 static void plmysql_close_host(HOST *);
 static void plmysql_down_host(HOST *);
@@ -258,6 +282,13 @@
 static void mysql_parse_config(DICT_MYSQL *, const char *);
 static HOST *host_init(const char *);
 
+#define INIT_VSTR(buf, len) do { \
+	if (buf == 0) \
+	    buf = vstring_alloc(len); \
+	VSTRING_RESET(buf); \
+	VSTRING_TERMINATE(buf); \
+    } while (0)
+
 /* dict_mysql_quote - escape SQL metacharacters in input string */
 
 static void dict_mysql_quote(DICT *dict, const char *name, VSTRING *result)
@@ -326,13 +357,6 @@
         return (0);
     }
 
-#define INIT_VSTR(buf, len) do { \
-	if (buf == 0) \
-	    buf = vstring_alloc(len); \
-	VSTRING_RESET(buf); \
-	VSTRING_TERMINATE(buf); \
-    } while (0)
-
     INIT_VSTR(query, 10);
 
     /*
@@ -388,6 +412,241 @@
     return ((dict_errno == 0 && *r) ? r : 0);
 }
 
+/* dict_mysql_update - update/insert database entry */
+
+static void dict_mysql_update(DICT *dict, const char *name, const char *val)
+{
+    const char *myname = "dict_mysql_update";
+    DICT_MYSQL *dict_mysql = (DICT_MYSQL *)dict;
+    MYSQL_RES *query_res;
+    static VSTRING *query;
+    int     numrows;
+    db_quote_callback_t quote_func = dict_mysql_quote;
+
+    dict_errno = 0;
+
+    /*
+     * Optionally fold the key.
+     */
+    if (dict->flags & DICT_FLAG_FOLD_FIX) {
+	if (dict->fold_buf == 0)
+	    dict->fold_buf = vstring_alloc(10);
+	vstring_strcpy(dict->fold_buf, name);
+	name = lowercase(vstring_str(dict->fold_buf));
+    }
+
+    INIT_VSTR(query, 10);
+
+    /*
+     * Suppress the lookup if the query expansion is empty
+     *
+     * This initial expansion is outside the context of any
+     * specific host connection, we just want to check the
+     * key pre-requisites, so when quoting happens separately
+     * for each connection, we don't bother with quoting...
+     */
+#if defined(MYSQL_VERSION_ID) && MYSQL_VERSION_ID >= 40000
+    quote_func = 0;
+#endif
+    if (!db_common_expand(dict_mysql->ctx, dict_mysql->query,
+    			  name, 0, query, quote_func))
+        return;
+    
+    /* check if name exists already */
+    /* do the query - set dict_errno & cleanup if there's an error */
+    if ((query_res = plmysql_query(dict_mysql, name, query,
+				   dict_mysql->dbname,
+				   dict_mysql->username,
+				   dict_mysql->password)) == 0) {
+	dict_errno = DICT_ERR_RETRY;
+	return;
+    }
+
+    numrows = mysql_num_rows(query_res);
+    mysql_free_result(query_res);
+    if (msg_verbose)
+	msg_info("%s: retrieved %d rows", myname, numrows);
+    if (numrows == 0) {		/* do insert */
+        if (!db_common_expand2(dict_mysql->ctx, dict_mysql->insert,
+    			  name, val, 0, query, quote_func))
+            return;
+
+        if ((numrows = plmysql_update(dict_mysql, name, val, dict_mysql->insert, query,
+			        dict_mysql->dbname,
+			        dict_mysql->username,
+			        dict_mysql->password)) == 0) {
+	    dict_errno = DICT_ERR_RETRY;
+	    return;
+	}
+
+    } else {			/* do update */
+        if (!db_common_expand2(dict_mysql->ctx, dict_mysql->update,
+    			  name, val, 0, query, quote_func))
+            return;
+
+        if ((numrows = plmysql_update(dict_mysql, name, val, dict_mysql->update, query,
+			        dict_mysql->dbname,
+			        dict_mysql->username,
+			        dict_mysql->password)) == 0) {
+	    dict_errno = DICT_ERR_RETRY;
+	    return;
+	}
+
+    }
+    if (msg_verbose)
+	msg_info("%s: updated %d rows", myname, numrows);
+}
+
+/* dict_mysql_delete - delete database entry */
+
+static int dict_mysql_delete(DICT *dict, const char *name)
+{
+    const char *myname = "dict_mysql_delete";
+    DICT_MYSQL *dict_mysql = (DICT_MYSQL *)dict;
+    static VSTRING *query;
+    int     numrows;
+    db_quote_callback_t quote_func = dict_mysql_quote;
+
+    dict_errno = 0;
+
+    /*
+     * Optionally fold the key.
+     */
+    if (dict->flags & DICT_FLAG_FOLD_FIX) {
+	if (dict->fold_buf == 0)
+	    dict->fold_buf = vstring_alloc(10);
+	vstring_strcpy(dict->fold_buf, name);
+	name = lowercase(vstring_str(dict->fold_buf));
+    }
+
+    INIT_VSTR(query, 10);
+
+    /*
+     * Suppress the lookup if the query expansion is empty
+     *
+     * This initial expansion is outside the context of any
+     * specific host connection, we just want to check the
+     * key pre-requisites, so when quoting happens separately
+     * for each connection, we don't bother with quoting...
+     */
+#if defined(MYSQL_VERSION_ID) && MYSQL_VERSION_ID >= 40000
+    quote_func = 0;
+#endif
+    if (!db_common_expand(dict_mysql->ctx, dict_mysql->delete,
+    			  name, 0, query, quote_func))
+        return (1);
+    
+    /* do the delete - set dict_errno & cleanup if there's an error */
+    if ((numrows = plmysql_update(dict_mysql, name, NULL, dict_mysql->delete, query,
+				   dict_mysql->dbname,
+				   dict_mysql->username,
+				   dict_mysql->password)) == 0) {
+	dict_errno = DICT_ERR_RETRY;
+	return (1);
+    }
+
+    if (msg_verbose)
+	msg_info("%s: deleted %d rows", myname, numrows);
+    if (numrows == 0) {		/* failure */
+        msg_warn("%s: %s: delete failed, key: '%s'",
+			 myname, dict_mysql->parser->name, name);
+	return (1);
+    } else { 				/* OK */
+	return (0);
+    }
+}
+
+/* dict_mysql_sequence - traverse the db */
+
+static int dict_mysql_sequence(DICT *dict, int function, 
+				const char **key, const char **value)
+{
+    const char *myname = "dict_mysql_sequence";
+    DICT_MYSQL *dict_mysql = (DICT_MYSQL *)dict;
+    MYSQL_ROW row;
+    static VSTRING *result;
+    static VSTRING *name;
+    const char *format = "%s";
+    VSTRING *seq = vstring_alloc(10);
+    int plmysql_errno = 0;
+    dict_errno = 0;
+
+    INIT_VSTR(name, 10);
+    INIT_VSTR(result, 10);
+
+    switch (function) {
+    case DICT_SEQ_FUN_FIRST:
+        /* initialise handler */
+        vstring_sprintf(seq, "HANDLER %s OPEN", dict_mysql->cache_tblname);
+        plmysql_sequence(dict_mysql, plmysql_errno, seq,
+                            dict_mysql->dbname,
+                            dict_mysql->username,
+                            dict_mysql->password);
+	if (plmysql_errno > 0){
+            dict_errno = DICT_ERR_RETRY;
+            vstring_free(seq);
+            return (1);
+        } 
+        vstring_sprintf(seq, "HANDLER %s READ FIRST", dict_mysql->cache_tblname);
+	if (msg_verbose)
+	    msg_info("%s: handler opened", myname);
+	break;
+    case DICT_SEQ_FUN_NEXT:
+        vstring_sprintf(seq, "HANDLER %s READ NEXT", dict_mysql->cache_tblname);
+	break;
+    default:
+        msg_panic("%s: invalid function %d", myname, function);
+	return (1);
+    }
+
+    if (msg_verbose)
+	msg_info("%s: handler read next", myname);
+    if ((dict_mysql->cursor = plmysql_sequence(dict_mysql, plmysql_errno, seq,
+                                   dict_mysql->dbname,
+        	                   dict_mysql->username,
+                                   dict_mysql->password)) == 0) {
+        dict_errno = DICT_ERR_RETRY;
+	vstring_free(seq);
+        return (1);
+    } 
+    row = mysql_fetch_row(dict_mysql->cursor); 
+    if (row == NULL) {
+	/* close handler */
+        vstring_sprintf(seq, "HANDLER %s CLOSE", dict_mysql->cache_tblname);
+        plmysql_sequence(dict_mysql, plmysql_errno, seq,
+                              dict_mysql->dbname,
+                              dict_mysql->username,
+                              dict_mysql->password);
+	if (plmysql_errno > 0) {
+            dict_errno = DICT_ERR_RETRY;
+        }
+	if (msg_verbose)
+	    msg_info("%s: handler closed", myname);
+        vstring_free(seq);
+	return(1);
+    }
+    vstring_free(seq);
+    if (mysql_num_fields(dict_mysql->cursor) >= 2) {
+	if (!db_common_expand(dict_mysql->ctx, format,
+                              row[0], 0, name, 0)) {
+	    return (1);
+	}
+        if (!db_common_expand(dict_mysql->ctx, format,
+                              row[1], 0, result, 0)) {
+	    return (1);
+	}
+    } else {
+        msg_warn("%s: invalid number of rows in query response: %d.",
+                  myname, mysql_num_fields(dict_mysql->cursor));
+        dict_errno = DICT_ERR_RETRY;
+        return (1);
+    } 
+
+    *key = vstring_str(name);
+    *value = vstring_str(result); 
+    return (0); 
+}
+
 /* dict_mysql_check_stat - check the status of a host */
 
 static int dict_mysql_check_stat(HOST *host, unsigned stat, unsigned type,
@@ -486,7 +745,6 @@
  *			on failure of all db instances, return 0;
  *			close unnecessary active connections
  */
-
 static MYSQL_RES *plmysql_query(DICT_MYSQL *dict_mysql,
 				        const char *name,
 					VSTRING *query,
@@ -514,8 +772,8 @@
 #endif
 
 	if (!(mysql_query(host->db, vstring_str(query)))) {
-	    if ((res = mysql_store_result(host->db)) == 0) {
-		msg_warn("mysql query failed: %s", mysql_error(host->db));
+	    if ((res = mysql_store_result(host->db)) == 0) { 
+		msg_warn("dict_mysql: query failed: %s", mysql_error(host->db));
 		plmysql_down_host(host);
 	    } else {
 		if (msg_verbose)
@@ -533,6 +791,101 @@
 }
 
 /*
+ * plmysql_update - process a MySQL update, insert or delete query.
+ *			Return number of affected rows on success.
+ *			On failure, log failure and try other db instances.
+ *			on failure of all db instances, return 0;
+ *			close unnecessary active connections
+ */
+static int plmysql_update(DICT_MYSQL *dict_mysql,
+			        const char *name,
+				const char *val,
+				const char *update,
+				VSTRING *query,
+			        char *dbname,
+			        char *username,
+			        char *password)
+{
+    PLMYSQL *PLDB = dict_mysql->pldb;
+    HOST   *host;
+    int numrows;
+
+    while ((host = dict_mysql_get_active(PLDB, dbname, username, password)) != NULL) {
+
+#if defined(MYSQL_VERSION_ID) && MYSQL_VERSION_ID >= 40000
+	/*
+	 * The active host is used to escape strings in the
+	 * context of the active connection's character encoding.
+	 */
+	dict_mysql->active_host = host;
+	VSTRING_RESET(query);
+	VSTRING_TERMINATE(query);
+	db_common_expand2(dict_mysql->ctx, update,
+			 name, val, 0, query, dict_mysql_quote);
+	dict_mysql->active_host = 0;
+#endif
+
+	if (!(mysql_query(host->db, vstring_str(query)))) {
+	    if ((numrows = mysql_affected_rows(host->db)) == 0) {
+                msg_warn("dict_mysql: update failed: %s", mysql_error(host->db));
+                plmysql_down_host(host);
+            } else {
+                if (msg_verbose)
+                    msg_info("dict_mysql: successful update from host %s", host->hostname);
+                event_request_timer(dict_mysql_event, (char *) host, IDLE_CONN_INTV);
+                break;
+            }
+	} else {
+	    msg_warn("dict_mysql: update failed: %s", mysql_error(host->db));
+	    plmysql_down_host(host);
+	    numrows = 0;
+	}
+    }
+    return (numrows);
+}
+
+/*
+ * plmysql_sequence - process a MySQL sequence query. Return MYSQL_RES* on success.  
+ *         On failure, log failure and try other db instances.
+ *         on failure of all db instances, return 0;
+ *         close unnecessary active connections;
+ */
+static MYSQL_RES *plmysql_sequence(DICT_MYSQL *dict_mysql, int errno,
+					VSTRING *query,
+				        char *dbname,
+				        char *username,
+				        char *password)
+{
+    PLMYSQL *PLDB = dict_mysql->pldb;
+    HOST   *host;
+    MYSQL_RES *res = 0; 
+
+    while ((host = dict_mysql_get_active(PLDB, dbname, username, password)) != NULL) {
+
+	if (!(mysql_query(host->db, vstring_str(query)))) {
+	    /* an empty result set returns 0, too. Use errno to differentiate *
+	     * between an error and an empty result set */
+	    if ((res = mysql_store_result(host->db)) == 0 && 
+		(errno = mysql_errno(host->db)) > 0) { 
+		msg_warn("dict_mysql: faulty sequence result: %s", mysql_error(host->db));
+		plmysql_down_host(host);
+	    } else {
+		if (msg_verbose)
+		    msg_info("dict_mysql: successful sequence query from host %s",
+					 host->hostname);
+		event_request_timer(dict_mysql_event, (char *) host, IDLE_CONN_INTV);
+		break;
+	    }
+	} else {
+	    msg_warn("dict_mysql: sequence query failed: %s", mysql_error(host->db));
+	    plmysql_down_host(host);
+	}
+    }
+
+    return res;
+}
+
+/*
  * plmysql_connect_single -
  * used to reconnect to a single database when one is down or none is
  * connected yet. Log all errors and set the stat field of host accordingly
@@ -595,6 +948,7 @@
     dict_mysql->username = cfg_get_str(p, "user", "", 0, 0);
     dict_mysql->password = cfg_get_str(p, "password", "", 0, 0);
     dict_mysql->dbname = cfg_get_str(p, "dbname", "", 1, 0);
+    dict_mysql->cache_tblname = cfg_get_str(p, "cache_tblname", "", 0, 0);
     dict_mysql->result_format = cfg_get_str(p, "result_format", "%s", 1, 0);
     /*
      * XXX: The default should be non-zero for safety, but that is not
@@ -613,6 +967,10 @@
 	dict_mysql->query = vstring_export(buf);
     }
 
+    dict_mysql->insert = cfg_get_str(p, "insert", NULL, 0, 0);
+    dict_mysql->update = cfg_get_str(p, "update", NULL, 0, 0);
+    dict_mysql->delete = cfg_get_str(p, "delete", NULL, 0, 0);
+
     /*
      * Must parse all templates before we can use db_common_expand()
      */
@@ -654,14 +1012,17 @@
 
     /*
      * Sanity checks.
-     */
+     *
     if (open_flags != O_RDONLY)
 	msg_fatal("%s:%s map requires O_RDONLY access mode",
 		  DICT_TYPE_MYSQL, name);
-
+    */
     dict_mysql = (DICT_MYSQL *) dict_alloc(DICT_TYPE_MYSQL, name,
 					   sizeof(DICT_MYSQL));
     dict_mysql->dict.lookup = dict_mysql_lookup;
+    dict_mysql->dict.update = dict_mysql_update;
+    dict_mysql->dict.delete = dict_mysql_delete;
+    dict_mysql->dict.sequence = dict_mysql_sequence;
     dict_mysql->dict.close = dict_mysql_close;
     dict_mysql->dict.flags = dict_flags;
     mysql_parse_config(dict_mysql, name);

Reply via email to