Attaching my WIP patch here...

On Sat, Jun 20, 2015 at 04:37:11PM +0000, Adam Roses Wight wrote:
> Many hosting providers limit the number of outbound SMTP connections over
> time, and exceeding their threshold results in harsh consequences like
> dropped connections and unpredictable delays until recovery.  It would be
> ideal if Postfix supported absolute outbound limits.
> 
> Currently, the only workarounds I've found are either complex or have
> unwanted side effects.
> 
> There's a precendent for limiting traffic by, the following
> configuration parameters limit by destination, for example:
>     * default_destination_concurrency_limit
>     * default_destination_rate_delay
> 
> I'm proposing that we implement the following configuration parameter,
> which rate limits an entire transport regardless of destination:
> 
> default_rate_delay (default: 0s)
>     The default amount of delay that is inserted between deliveries.
> 
>     To enable the delay, specify a non-zero time value (an integral value     
>                                                             
>     plus an optional one-letter suffix that specifies the time unit).         
>                                                             
> 
> I've started writing this feature to determine whether it's feasible
> with the current architecture, but no success yet, this doesn't seem
> to limit the number of jobs which are popped from the queue:
> 
> https://github.com/vdukhovni/postfix/compare/master...adamwight:rate_delay
> 
> Thanks,
> Adam
commit 78f288daaf74a721c21b3a8e0846469843164be9
Author: Adam Roses Wight <awi...@wikimedia.org>
Date:   Tue Jun 16 09:31:17 2015 -0700

    WIP Implement outbound throttling

diff --git a/postfix/src/global/mail_params.h b/postfix/src/global/mail_params.h
index 823fc0c..b565387 100644
--- a/postfix/src/global/mail_params.h
+++ b/postfix/src/global/mail_params.h
@@ -3396,6 +3396,11 @@ extern bool var_conc_feedback_debug;
 #define DEF_DEST_RATE_DELAY	"0s"
 extern int var_dest_rate_delay;
 
+#define VAR_RATE_DELAY	"default_rate_delay"
+#define _RATE_DELAY	"_rate_delay"
+#define DEF_RATE_DELAY	"0s"
+extern int var_rate_delay;
+
  /*
   * Stress handling.
   */
diff --git a/postfix/src/postconf/postconf_service.c b/postfix/src/postconf/postconf_service.c
index bebf6de..4955f41 100644
--- a/postfix/src/postconf/postconf_service.c
+++ b/postfix/src/postconf/postconf_service.c
@@ -134,6 +134,7 @@ void    pcf_register_service_parameters(void)
 	_CONC_NEG_FDBACK, VAR_CONC_NEG_FDBACK,
 	_CONC_COHORT_LIM, VAR_CONC_COHORT_LIM,
 	_DEST_RATE_DELAY, VAR_DEST_RATE_DELAY,
+	_RATE_DELAY, VAR_RATE_DELAY,
 	0,
     };
     static const PCF_STRING_NV spawn_params[] = {
diff --git a/postfix/src/qmgr/qmgr.c b/postfix/src/qmgr/qmgr.c
index 1b44ef1..17b63d6 100644
--- a/postfix/src/qmgr/qmgr.c
+++ b/postfix/src/qmgr/qmgr.c
@@ -446,6 +446,7 @@ char   *var_conc_pos_feedback;
 char   *var_conc_neg_feedback;
 int     var_conc_cohort_limit;
 int     var_conc_feedback_debug;
+int     var_rate_delay;
 int     var_dest_rate_delay;
 char   *var_def_filter_nexthop;
 int     var_qmgr_daemon_timeout;
@@ -702,6 +703,7 @@ int     main(int argc, char **argv)
 	VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0,
 	VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0,
 	VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0,
+	VAR_RATE_DELAY, DEF_RATE_DELAY, &var_rate_delay, 0, 0,
 	VAR_DEST_RATE_DELAY, DEF_DEST_RATE_DELAY, &var_dest_rate_delay, 0, 0,
 	VAR_QMGR_DAEMON_TIMEOUT, DEF_QMGR_DAEMON_TIMEOUT, &var_qmgr_daemon_timeout, 1, 0,
 	VAR_QMGR_IPC_TIMEOUT, DEF_QMGR_IPC_TIMEOUT, &var_qmgr_ipc_timeout, 1, 0,
diff --git a/postfix/src/qmgr/qmgr.h b/postfix/src/qmgr/qmgr.h
index d5f4845..e251b95 100644
--- a/postfix/src/qmgr/qmgr.h
+++ b/postfix/src/qmgr/qmgr.h
@@ -204,20 +204,25 @@ struct QMGR_TRANSPORT {
     QMGR_FEEDBACK pos_feedback;		/* positive feedback control */
     QMGR_FEEDBACK neg_feedback;		/* negative feedback control */
     int     fail_cohort_limit;		/* flow shutdown control */
-    int     rate_delay;			/* suspend per delivery */
+    int     rate_delay;			/* throttle */
+    int     dest_rate_delay;		/* suspend per delivery */
 };
 
 #define QMGR_TRANSPORT_STAT_DEAD	(1<<1)
+#define QMGR_TRANSPORT_STAT_SUSPENDED	(1<<2)
 
 typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
 extern QMGR_TRANSPORT *qmgr_transport_select(void);
 extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
 extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
 extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
+extern void qmgr_transport_suspend(QMGR_TRANSPORT *, int);
+extern void qmgr_transport_resume(int, void *);
 extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
 extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
 
 #define QMGR_TRANSPORT_THROTTLED(t)	((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_SUSPENDED(t)	((t)->flags & QMGR_TRANSPORT_STAT_SUSPENDED)
 
  /*
   * Each next hop (e.g., a domain name) has its own queue of pending message
diff --git a/postfix/src/qmgr/qmgr_entry.c b/postfix/src/qmgr/qmgr_entry.c
index 61b3434..09c63fc 100644
--- a/postfix/src/qmgr/qmgr_entry.c
+++ b/postfix/src/qmgr/qmgr_entry.c
@@ -306,14 +306,19 @@ void    qmgr_entry_done(QMGR_ENTRY *entry, int which)
      * until the queue is resumed. Otherwise, we make those decisions now.
      * The job scheduling decisions are made by qmgr_job_blocker_update().
      */
-    if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
-	if (queue->window > 1)
-	    msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
-		      myname, transport->name, queue->name, queue->window);
-	if (QMGR_QUEUE_THROTTLED(queue))	/* XXX */
-	    qmgr_queue_unthrottle(queue);
-	if (QMGR_QUEUE_READY(queue))
-	    qmgr_queue_suspend(queue, transport->rate_delay);
+    if (which == QMGR_QUEUE_BUSY) {
+	if (transport->rate_delay > 0) {
+	    qmgr_transport_suspend(transport, transport->rate_delay);
+	}
+	if (transport->dest_rate_delay > 0) {
+	    if (queue->window > 1)
+		msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
+			  myname, transport->name, queue->name, queue->window);
+	    if (QMGR_QUEUE_THROTTLED(queue))	/* XXX */
+		qmgr_queue_unthrottle(queue);
+	    if (QMGR_QUEUE_READY(queue))
+		qmgr_queue_suspend(queue, transport->dest_rate_delay);
+	}
     }
     if (!QMGR_QUEUE_SUSPENDED(queue)
 	&& queue->blocker_tag == transport->blocker_tag)
diff --git a/postfix/src/qmgr/qmgr_transport.c b/postfix/src/qmgr/qmgr_transport.c
index 0611f3b..6857cd1 100644
--- a/postfix/src/qmgr/qmgr_transport.c
+++ b/postfix/src/qmgr/qmgr_transport.c
@@ -221,6 +221,39 @@ void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
     }
 }
 
+/* qmgr_transport_suspend - suspends delivery */
+void qmgr_transport_suspend(QMGR_TRANSPORT *transport, int delay)
+{
+    const char *myname = "qmgr_transport_suspend";
+
+    /*
+     * Sanity checks.
+     */
+    if (QMGR_TRANSPORT_SUSPENDED(transport))
+	msg_panic("%s: bad transport status (suspended): %s", myname, transport->name);
+
+    transport->flags |= QMGR_TRANSPORT_STAT_SUSPENDED;
+
+msg_warn("Waiting %d seconds on transport %s", delay, transport->name);
+    event_request_timer(qmgr_transport_resume, (void *) transport, delay);
+}
+
+/* qmgr_transport_resume - resume delivery */
+void qmgr_transport_resume(int unused_event, void *context)
+{
+    QMGR_TRANSPORT *transport = (QMGR_TRANSPORT *) context;
+
+    const char *myname = "qmgr_transport_resume";
+
+    /*
+     * Sanity checks.
+     */
+    if (!QMGR_TRANSPORT_SUSPENDED(transport))
+	msg_panic("%s: bad transport status (not suspended): %s", myname, transport->name);
+
+    transport->flags &= ~QMGR_TRANSPORT_STAT_SUSPENDED;
+}
+
 /* qmgr_transport_abort - transport connect watchdog */
 
 static void qmgr_transport_abort(int unused_event, void *context)
@@ -286,7 +319,7 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
 #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
 
     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
-	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+	if (xport->flags > 0
 	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
 	    continue;
 	need = xport->pending + 1;
@@ -314,8 +347,8 @@ void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOT
     /*
      * Sanity checks.
      */
-    if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
-	msg_panic("qmgr_transport: dead transport: %s", transport->name);
+    if (transport->flags > 0)
+	msg_panic("qmgr_transport: transport not ready: %s", transport->name);
     if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
 	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
 
@@ -392,11 +425,16 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
     transport->init_dest_concurrency =
 	get_mail_conf_int2(name, _INIT_DEST_CON,
 			   var_init_dest_concurrency, 1, 0);
-    transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
+    transport->rate_delay = get_mail_conf_time2(name, _RATE_DELAY,
+						var_rate_delay,
+						's', 0, 0);
+    transport->dest_rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
 						var_dest_rate_delay,
 						's', 0, 0);
 
-    if (transport->rate_delay > 0)
+    // FIXME: rate_delay will only work with 1 process.
+
+    if (transport->dest_rate_delay > 0)
 	transport->dest_concurrency_limit = 1;
     if (transport->dest_concurrency_limit != 0
     && transport->dest_concurrency_limit < transport->init_dest_concurrency)

Reply via email to