Attaching my WIP patch here...
On Sat, Jun 20, 2015 at 04:37:11PM +0000, Adam Roses Wight wrote:
> Many hosting providers limit the number of outbound SMTP connections over
> time, and exceeding their threshold results in harsh consequences like
> dropped connections and unpredictable delays until recovery. It would be
> ideal if Postfix supported absolute outbound limits.
>
> Currently, the only workarounds I've found are either complex or have
> unwanted side effects.
>
> There's a precendent for limiting traffic by, the following
> configuration parameters limit by destination, for example:
> * default_destination_concurrency_limit
> * default_destination_rate_delay
>
> I'm proposing that we implement the following configuration parameter,
> which rate limits an entire transport regardless of destination:
>
> default_rate_delay (default: 0s)
> The default amount of delay that is inserted between deliveries.
>
> To enable the delay, specify a non-zero time value (an integral value
>
> plus an optional one-letter suffix that specifies the time unit).
>
>
> I've started writing this feature to determine whether it's feasible
> with the current architecture, but no success yet, this doesn't seem
> to limit the number of jobs which are popped from the queue:
>
> https://github.com/vdukhovni/postfix/compare/master...adamwight:rate_delay
>
> Thanks,
> Adam
commit 78f288daaf74a721c21b3a8e0846469843164be9
Author: Adam Roses Wight <[email protected]>
Date: Tue Jun 16 09:31:17 2015 -0700
WIP Implement outbound throttling
diff --git a/postfix/src/global/mail_params.h b/postfix/src/global/mail_params.h
index 823fc0c..b565387 100644
--- a/postfix/src/global/mail_params.h
+++ b/postfix/src/global/mail_params.h
@@ -3396,6 +3396,11 @@ extern bool var_conc_feedback_debug;
#define DEF_DEST_RATE_DELAY "0s"
extern int var_dest_rate_delay;
+#define VAR_RATE_DELAY "default_rate_delay"
+#define _RATE_DELAY "_rate_delay"
+#define DEF_RATE_DELAY "0s"
+extern int var_rate_delay;
+
/*
* Stress handling.
*/
diff --git a/postfix/src/postconf/postconf_service.c b/postfix/src/postconf/postconf_service.c
index bebf6de..4955f41 100644
--- a/postfix/src/postconf/postconf_service.c
+++ b/postfix/src/postconf/postconf_service.c
@@ -134,6 +134,7 @@ void pcf_register_service_parameters(void)
_CONC_NEG_FDBACK, VAR_CONC_NEG_FDBACK,
_CONC_COHORT_LIM, VAR_CONC_COHORT_LIM,
_DEST_RATE_DELAY, VAR_DEST_RATE_DELAY,
+ _RATE_DELAY, VAR_RATE_DELAY,
0,
};
static const PCF_STRING_NV spawn_params[] = {
diff --git a/postfix/src/qmgr/qmgr.c b/postfix/src/qmgr/qmgr.c
index 1b44ef1..17b63d6 100644
--- a/postfix/src/qmgr/qmgr.c
+++ b/postfix/src/qmgr/qmgr.c
@@ -446,6 +446,7 @@ char *var_conc_pos_feedback;
char *var_conc_neg_feedback;
int var_conc_cohort_limit;
int var_conc_feedback_debug;
+int var_rate_delay;
int var_dest_rate_delay;
char *var_def_filter_nexthop;
int var_qmgr_daemon_timeout;
@@ -702,6 +703,7 @@ int main(int argc, char **argv)
VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0,
VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0,
VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0,
+ VAR_RATE_DELAY, DEF_RATE_DELAY, &var_rate_delay, 0, 0,
VAR_DEST_RATE_DELAY, DEF_DEST_RATE_DELAY, &var_dest_rate_delay, 0, 0,
VAR_QMGR_DAEMON_TIMEOUT, DEF_QMGR_DAEMON_TIMEOUT, &var_qmgr_daemon_timeout, 1, 0,
VAR_QMGR_IPC_TIMEOUT, DEF_QMGR_IPC_TIMEOUT, &var_qmgr_ipc_timeout, 1, 0,
diff --git a/postfix/src/qmgr/qmgr.h b/postfix/src/qmgr/qmgr.h
index d5f4845..e251b95 100644
--- a/postfix/src/qmgr/qmgr.h
+++ b/postfix/src/qmgr/qmgr.h
@@ -204,20 +204,25 @@ struct QMGR_TRANSPORT {
QMGR_FEEDBACK pos_feedback; /* positive feedback control */
QMGR_FEEDBACK neg_feedback; /* negative feedback control */
int fail_cohort_limit; /* flow shutdown control */
- int rate_delay; /* suspend per delivery */
+ int rate_delay; /* throttle */
+ int dest_rate_delay; /* suspend per delivery */
};
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
+#define QMGR_TRANSPORT_STAT_SUSPENDED (1<<2)
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
extern QMGR_TRANSPORT *qmgr_transport_select(void);
extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
+extern void qmgr_transport_suspend(QMGR_TRANSPORT *, int);
+extern void qmgr_transport_resume(int, void *);
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_SUSPENDED(t) ((t)->flags & QMGR_TRANSPORT_STAT_SUSPENDED)
/*
* Each next hop (e.g., a domain name) has its own queue of pending message
diff --git a/postfix/src/qmgr/qmgr_entry.c b/postfix/src/qmgr/qmgr_entry.c
index 61b3434..09c63fc 100644
--- a/postfix/src/qmgr/qmgr_entry.c
+++ b/postfix/src/qmgr/qmgr_entry.c
@@ -306,14 +306,19 @@ void qmgr_entry_done(QMGR_ENTRY *entry, int which)
* until the queue is resumed. Otherwise, we make those decisions now.
* The job scheduling decisions are made by qmgr_job_blocker_update().
*/
- if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
- if (queue->window > 1)
- msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
- myname, transport->name, queue->name, queue->window);
- if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */
- qmgr_queue_unthrottle(queue);
- if (QMGR_QUEUE_READY(queue))
- qmgr_queue_suspend(queue, transport->rate_delay);
+ if (which == QMGR_QUEUE_BUSY) {
+ if (transport->rate_delay > 0) {
+ qmgr_transport_suspend(transport, transport->rate_delay);
+ }
+ if (transport->dest_rate_delay > 0) {
+ if (queue->window > 1)
+ msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
+ myname, transport->name, queue->name, queue->window);
+ if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */
+ qmgr_queue_unthrottle(queue);
+ if (QMGR_QUEUE_READY(queue))
+ qmgr_queue_suspend(queue, transport->dest_rate_delay);
+ }
}
if (!QMGR_QUEUE_SUSPENDED(queue)
&& queue->blocker_tag == transport->blocker_tag)
diff --git a/postfix/src/qmgr/qmgr_transport.c b/postfix/src/qmgr/qmgr_transport.c
index 0611f3b..6857cd1 100644
--- a/postfix/src/qmgr/qmgr_transport.c
+++ b/postfix/src/qmgr/qmgr_transport.c
@@ -221,6 +221,39 @@ void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
}
}
+/* qmgr_transport_suspend - suspends delivery */
+void qmgr_transport_suspend(QMGR_TRANSPORT *transport, int delay)
+{
+ const char *myname = "qmgr_transport_suspend";
+
+ /*
+ * Sanity checks.
+ */
+ if (QMGR_TRANSPORT_SUSPENDED(transport))
+ msg_panic("%s: bad transport status (suspended): %s", myname, transport->name);
+
+ transport->flags |= QMGR_TRANSPORT_STAT_SUSPENDED;
+
+msg_warn("Waiting %d seconds on transport %s", delay, transport->name);
+ event_request_timer(qmgr_transport_resume, (void *) transport, delay);
+}
+
+/* qmgr_transport_resume - resume delivery */
+void qmgr_transport_resume(int unused_event, void *context)
+{
+ QMGR_TRANSPORT *transport = (QMGR_TRANSPORT *) context;
+
+ const char *myname = "qmgr_transport_resume";
+
+ /*
+ * Sanity checks.
+ */
+ if (!QMGR_TRANSPORT_SUSPENDED(transport))
+ msg_panic("%s: bad transport status (not suspended): %s", myname, transport->name);
+
+ transport->flags &= ~QMGR_TRANSPORT_STAT_SUSPENDED;
+}
+
/* qmgr_transport_abort - transport connect watchdog */
static void qmgr_transport_abort(int unused_event, void *context)
@@ -286,7 +319,7 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
- if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+ if (xport->flags > 0
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
continue;
need = xport->pending + 1;
@@ -314,8 +347,8 @@ void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOT
/*
* Sanity checks.
*/
- if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
- msg_panic("qmgr_transport: dead transport: %s", transport->name);
+ if (transport->flags > 0)
+ msg_panic("qmgr_transport: transport not ready: %s", transport->name);
if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
msg_panic("qmgr_transport: excess allocation: %s", transport->name);
@@ -392,11 +425,16 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
transport->init_dest_concurrency =
get_mail_conf_int2(name, _INIT_DEST_CON,
var_init_dest_concurrency, 1, 0);
- transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
+ transport->rate_delay = get_mail_conf_time2(name, _RATE_DELAY,
+ var_rate_delay,
+ 's', 0, 0);
+ transport->dest_rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
var_dest_rate_delay,
's', 0, 0);
- if (transport->rate_delay > 0)
+ // FIXME: rate_delay will only work with 1 process.
+
+ if (transport->dest_rate_delay > 0)
transport->dest_concurrency_limit = 1;
if (transport->dest_concurrency_limit != 0
&& transport->dest_concurrency_limit < transport->init_dest_concurrency)