Attaching my WIP patch here... On Sat, Jun 20, 2015 at 04:37:11PM +0000, Adam Roses Wight wrote: > Many hosting providers limit the number of outbound SMTP connections over > time, and exceeding their threshold results in harsh consequences like > dropped connections and unpredictable delays until recovery. It would be > ideal if Postfix supported absolute outbound limits. > > Currently, the only workarounds I've found are either complex or have > unwanted side effects. > > There's a precendent for limiting traffic by, the following > configuration parameters limit by destination, for example: > * default_destination_concurrency_limit > * default_destination_rate_delay > > I'm proposing that we implement the following configuration parameter, > which rate limits an entire transport regardless of destination: > > default_rate_delay (default: 0s) > The default amount of delay that is inserted between deliveries. > > To enable the delay, specify a non-zero time value (an integral value > > plus an optional one-letter suffix that specifies the time unit). > > > I've started writing this feature to determine whether it's feasible > with the current architecture, but no success yet, this doesn't seem > to limit the number of jobs which are popped from the queue: > > https://github.com/vdukhovni/postfix/compare/master...adamwight:rate_delay > > Thanks, > Adam
commit 78f288daaf74a721c21b3a8e0846469843164be9 Author: Adam Roses Wight <awi...@wikimedia.org> Date: Tue Jun 16 09:31:17 2015 -0700
WIP Implement outbound throttling diff --git a/postfix/src/global/mail_params.h b/postfix/src/global/mail_params.h index 823fc0c..b565387 100644 --- a/postfix/src/global/mail_params.h +++ b/postfix/src/global/mail_params.h @@ -3396,6 +3396,11 @@ extern bool var_conc_feedback_debug; #define DEF_DEST_RATE_DELAY "0s" extern int var_dest_rate_delay; +#define VAR_RATE_DELAY "default_rate_delay" +#define _RATE_DELAY "_rate_delay" +#define DEF_RATE_DELAY "0s" +extern int var_rate_delay; + /* * Stress handling. */ diff --git a/postfix/src/postconf/postconf_service.c b/postfix/src/postconf/postconf_service.c index bebf6de..4955f41 100644 --- a/postfix/src/postconf/postconf_service.c +++ b/postfix/src/postconf/postconf_service.c @@ -134,6 +134,7 @@ void pcf_register_service_parameters(void) _CONC_NEG_FDBACK, VAR_CONC_NEG_FDBACK, _CONC_COHORT_LIM, VAR_CONC_COHORT_LIM, _DEST_RATE_DELAY, VAR_DEST_RATE_DELAY, + _RATE_DELAY, VAR_RATE_DELAY, 0, }; static const PCF_STRING_NV spawn_params[] = { diff --git a/postfix/src/qmgr/qmgr.c b/postfix/src/qmgr/qmgr.c index 1b44ef1..17b63d6 100644 --- a/postfix/src/qmgr/qmgr.c +++ b/postfix/src/qmgr/qmgr.c @@ -446,6 +446,7 @@ char *var_conc_pos_feedback; char *var_conc_neg_feedback; int var_conc_cohort_limit; int var_conc_feedback_debug; +int var_rate_delay; int var_dest_rate_delay; char *var_def_filter_nexthop; int var_qmgr_daemon_timeout; @@ -702,6 +703,7 @@ int main(int argc, char **argv) VAR_XPORT_RETRY_TIME, DEF_XPORT_RETRY_TIME, &var_transport_retry_time, 1, 0, VAR_QMGR_CLOG_WARN_TIME, DEF_QMGR_CLOG_WARN_TIME, &var_qmgr_clog_warn_time, 0, 0, VAR_XPORT_REFILL_DELAY, DEF_XPORT_REFILL_DELAY, &var_xport_refill_delay, 1, 0, + VAR_RATE_DELAY, DEF_RATE_DELAY, &var_rate_delay, 0, 0, VAR_DEST_RATE_DELAY, DEF_DEST_RATE_DELAY, &var_dest_rate_delay, 0, 0, VAR_QMGR_DAEMON_TIMEOUT, DEF_QMGR_DAEMON_TIMEOUT, &var_qmgr_daemon_timeout, 1, 0, VAR_QMGR_IPC_TIMEOUT, DEF_QMGR_IPC_TIMEOUT, &var_qmgr_ipc_timeout, 1, 0, diff --git a/postfix/src/qmgr/qmgr.h b/postfix/src/qmgr/qmgr.h index d5f4845..e251b95 100644 --- a/postfix/src/qmgr/qmgr.h +++ b/postfix/src/qmgr/qmgr.h @@ -204,20 +204,25 @@ struct QMGR_TRANSPORT { QMGR_FEEDBACK pos_feedback; /* positive feedback control */ QMGR_FEEDBACK neg_feedback; /* negative feedback control */ int fail_cohort_limit; /* flow shutdown control */ - int rate_delay; /* suspend per delivery */ + int rate_delay; /* throttle */ + int dest_rate_delay; /* suspend per delivery */ }; #define QMGR_TRANSPORT_STAT_DEAD (1<<1) +#define QMGR_TRANSPORT_STAT_SUSPENDED (1<<2) typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); extern QMGR_TRANSPORT *qmgr_transport_select(void); extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY); extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *); extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); +extern void qmgr_transport_suspend(QMGR_TRANSPORT *, int); +extern void qmgr_transport_resume(int, void *); extern QMGR_TRANSPORT *qmgr_transport_create(const char *); extern QMGR_TRANSPORT *qmgr_transport_find(const char *); #define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) +#define QMGR_TRANSPORT_SUSPENDED(t) ((t)->flags & QMGR_TRANSPORT_STAT_SUSPENDED) /* * Each next hop (e.g., a domain name) has its own queue of pending message diff --git a/postfix/src/qmgr/qmgr_entry.c b/postfix/src/qmgr/qmgr_entry.c index 61b3434..09c63fc 100644 --- a/postfix/src/qmgr/qmgr_entry.c +++ b/postfix/src/qmgr/qmgr_entry.c @@ -306,14 +306,19 @@ void qmgr_entry_done(QMGR_ENTRY *entry, int which) * until the queue is resumed. Otherwise, we make those decisions now. * The job scheduling decisions are made by qmgr_job_blocker_update(). */ - if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { - if (queue->window > 1) - msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", - myname, transport->name, queue->name, queue->window); - if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ - qmgr_queue_unthrottle(queue); - if (QMGR_QUEUE_READY(queue)) - qmgr_queue_suspend(queue, transport->rate_delay); + if (which == QMGR_QUEUE_BUSY) { + if (transport->rate_delay > 0) { + qmgr_transport_suspend(transport, transport->rate_delay); + } + if (transport->dest_rate_delay > 0) { + if (queue->window > 1) + msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", + myname, transport->name, queue->name, queue->window); + if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ + qmgr_queue_unthrottle(queue); + if (QMGR_QUEUE_READY(queue)) + qmgr_queue_suspend(queue, transport->dest_rate_delay); + } } if (!QMGR_QUEUE_SUSPENDED(queue) && queue->blocker_tag == transport->blocker_tag) diff --git a/postfix/src/qmgr/qmgr_transport.c b/postfix/src/qmgr/qmgr_transport.c index 0611f3b..6857cd1 100644 --- a/postfix/src/qmgr/qmgr_transport.c +++ b/postfix/src/qmgr/qmgr_transport.c @@ -221,6 +221,39 @@ void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) } } +/* qmgr_transport_suspend - suspends delivery */ +void qmgr_transport_suspend(QMGR_TRANSPORT *transport, int delay) +{ + const char *myname = "qmgr_transport_suspend"; + + /* + * Sanity checks. + */ + if (QMGR_TRANSPORT_SUSPENDED(transport)) + msg_panic("%s: bad transport status (suspended): %s", myname, transport->name); + + transport->flags |= QMGR_TRANSPORT_STAT_SUSPENDED; + +msg_warn("Waiting %d seconds on transport %s", delay, transport->name); + event_request_timer(qmgr_transport_resume, (void *) transport, delay); +} + +/* qmgr_transport_resume - resume delivery */ +void qmgr_transport_resume(int unused_event, void *context) +{ + QMGR_TRANSPORT *transport = (QMGR_TRANSPORT *) context; + + const char *myname = "qmgr_transport_resume"; + + /* + * Sanity checks. + */ + if (!QMGR_TRANSPORT_SUSPENDED(transport)) + msg_panic("%s: bad transport status (not suspended): %s", myname, transport->name); + + transport->flags &= ~QMGR_TRANSPORT_STAT_SUSPENDED; +} + /* qmgr_transport_abort - transport connect watchdog */ static void qmgr_transport_abort(int unused_event, void *context) @@ -286,7 +319,7 @@ QMGR_TRANSPORT *qmgr_transport_select(void) #define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { - if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + if (xport->flags > 0 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) continue; need = xport->pending + 1; @@ -314,8 +347,8 @@ void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOT /* * Sanity checks. */ - if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) - msg_panic("qmgr_transport: dead transport: %s", transport->name); + if (transport->flags > 0) + msg_panic("qmgr_transport: transport not ready: %s", transport->name); if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) msg_panic("qmgr_transport: excess allocation: %s", transport->name); @@ -392,11 +425,16 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name) transport->init_dest_concurrency = get_mail_conf_int2(name, _INIT_DEST_CON, var_init_dest_concurrency, 1, 0); - transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, + transport->rate_delay = get_mail_conf_time2(name, _RATE_DELAY, + var_rate_delay, + 's', 0, 0); + transport->dest_rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, var_dest_rate_delay, 's', 0, 0); - if (transport->rate_delay > 0) + // FIXME: rate_delay will only work with 1 process. + + if (transport->dest_rate_delay > 0) transport->dest_concurrency_limit = 1; if (transport->dest_concurrency_limit != 0 && transport->dest_concurrency_limit < transport->init_dest_concurrency)