On Mon, 21 May 2012 17:59:53 +0100 "Daniel P. Berrange" <berra...@redhat.com> wrote:
> From: "Daniel P. Berrange" <berra...@redhat.com> > > Allow certain event types to be rate limited to avoid flooding > monitor clients. The monitor_protocol_event() method is changed > such that instead of immediately emitting the event to Monitor > instances, it will call a new monitor_protocol_event_queue() > method. > > This will check to see if the rate limit for the event has been > exceeded, and if so schedule a timer to wakeup at the end of the > rate limit period. If further events arrive before the timer fires, > the previously queued event will be discarded in favour of the new > event. The event will eventually be emitted when the timer fires. > > This logic is applied to RTC_CHANGE, BALLOON_CHANGE & WATCHDOG > events, since the data associated with these events is stateless > > * monitor.c: Add support for rate limiting > * monitor.h: Define monitor_global_init for one-time setup tasks > * vl.c: Invoke monitor_global_init > * trace-events: Add hooks for monitor event tracing > > Signed-off-by: Daniel P. Berrange <berra...@redhat.com> I've talked with Anthony he doesn't have objections. I've reviewed this again and have some minor comments below. > --- > monitor.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- > monitor.h | 1 + > trace-events | 5 ++ > vl.c | 2 + > 4 files changed, 164 insertions(+), 6 deletions(-) > > diff --git a/monitor.c b/monitor.c > index 75fd4cf..5135966 100644 > --- a/monitor.c > +++ b/monitor.c > @@ -66,6 +66,7 @@ > #include "memory.h" > #include "qmp-commands.h" > #include "hmp.h" > +#include "qemu-thread.h" > > /* for pic/irq_info */ > #if defined(TARGET_SPARC) > @@ -145,6 +146,19 @@ typedef struct MonitorControl { > int command_mode; > } MonitorControl; > > +/* > + * To prevent flooding clients, events can be throttled. The > + * throttling is calculating globally, rather than per-Monitor > + * instance. > + */ calculated? > +typedef struct MonitorEventState { > + MonitorEvent event; /* Event being tracked */ > + int64_t rate; /* Period over which to throttle. 0 to disable */ > + int64_t last; /* Time at which event was last emitted */ > + QEMUTimer *timer; /* Timer for handling delayed events */ > + QObject *data; /* Event pending delayed dispatch */ > +} MonitorEventState; > + > struct Monitor { > CharDriverState *chr; > int mux_out; > @@ -447,6 +461,141 @@ static const char *monitor_event_names[] = { > }; > QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX) > > +MonitorEventState monitor_event_state[QEVENT_MAX]; > +QemuMutex monitor_event_state_lock; > + > +/* > + * Emits the event to every monitor instance > + */ > +static void > +monitor_protocol_event_emit(MonitorEvent event, > + QObject *data) > +{ > + Monitor *mon; > + > + trace_monitor_protocol_event_emit(event, data); > + QLIST_FOREACH(mon, &mon_list, entry) { > + if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) { > + monitor_json_emitter(mon, data); > + } > + } > +} > + > + > +/* > + * Queue a new event for emission to Monitor instances, > + * applying any rate limiting if required. > + */ > +static void > +monitor_protocol_event_queue(MonitorEvent event, > + QObject *data) > +{ > + MonitorEventState *evstate; > + int64_t now = qemu_get_clock_ns(rt_clock); > + assert(event < QEVENT_MAX); > + > + qemu_mutex_lock(&monitor_event_state_lock); > + evstate = &(monitor_event_state[event]); > + trace_monitor_protocol_event_queue(event, > + data, > + evstate->rate, > + evstate->last, > + now); > + > + /* Rate limit of 0 indicates no throttling */ > + if (!evstate->rate) { > + monitor_protocol_event_emit(event, data); > + evstate->last = now; > + } else { > + int64_t delta = now - evstate->last; > + if (evstate->data || > + delta < evstate->rate) { > + /* If there's an existing event pending, replace > + * it with the new event, otherwise schedule a > + * timer for delayed emission > + */ > + if (evstate->data) { > + qobject_decref(evstate->data); > + } else { > + int64_t then = evstate->last + evstate->rate; > + qemu_mod_timer_ns(evstate->timer, then); > + } > + evstate->data = data; > + qobject_incref(evstate->data); > + } else { > + monitor_protocol_event_emit(event, data); > + evstate->last = now; > + } > + } > + qemu_mutex_unlock(&monitor_event_state_lock); > +} > + > + > +/* > + * The callback invoked by QemuTimer when a delayed > + * event is ready to be emitted > + */ > +static void monitor_protocol_event_handler(void *opaque) > +{ > + MonitorEventState *evstate = opaque; > + int64_t now = qemu_get_clock_ns(rt_clock); > + > + qemu_mutex_lock(&monitor_event_state_lock); > + > + trace_monitor_protocol_event_handler(evstate->event, > + evstate->data, > + evstate->last, > + now); > + if (evstate->data) { > + monitor_protocol_event_emit(evstate->event, evstate->data); > + qobject_decref(evstate->data); > + evstate->data = NULL; > + } > + evstate->last = now; > + qemu_mutex_unlock(&monitor_event_state_lock); > +} > + > + > +/* > + * @event: the event ID to be limited > + * @rate: the rate limit in milliseconds > + * > + * Sets a rate limit on a particular event, so no > + * more than 1 event will be emitted within @rate > + * milliseconds > + */ > +static void > +monitor_protocol_event_throttle(MonitorEvent event, > + int64_t rate) > +{ > + MonitorEventState *evstate; > + assert(event < QEVENT_MAX); > + > + evstate = &(monitor_event_state[event]); > + > + trace_monitor_protocol_event_throttle(event, rate); > + evstate->event = event; > + evstate->rate = rate * SCALE_MS; > + evstate->timer = qemu_new_timer(rt_clock, > + SCALE_MS, > + monitor_protocol_event_handler, > + evstate); > + evstate->last = 0; > + evstate->data = NULL; > +} > + > + > +/* Global, one-time initializer to configure the rate limiting > + * and initialize state */ > +static void monitor_protocol_event_init(void) > +{ > + qemu_mutex_init(&monitor_event_state_lock); > + /* Limit RTC & BALLOON events to 1 per second */ > + monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000); > + monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000); > + monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000); What about SUSPENDED and BLOCK_IO_ERROR? Couldn't the former be also used by a malicious guest to cause a DoS? The former is already emitted several times for virtio. > +} > + > /** > * monitor_protocol_event(): Generate a Monitor event > * > @@ -456,7 +605,6 @@ void monitor_protocol_event(MonitorEvent event, QObject > *data) > { > QDict *qmp; > const char *event_name; > - Monitor *mon; > > assert(event < QEVENT_MAX); > > @@ -471,11 +619,8 @@ void monitor_protocol_event(MonitorEvent event, QObject > *data) > qdict_put_obj(qmp, "data", data); > } > > - QLIST_FOREACH(mon, &mon_list, entry) { > - if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) { > - monitor_json_emitter(mon, QOBJECT(qmp)); > - } > - } > + trace_monitor_protocol_event(event, event_name, qmp); > + monitor_protocol_event_queue(event, QOBJECT(qmp)); > QDECREF(qmp); > } > > @@ -4564,6 +4709,11 @@ static void sortcmdlist(void) > * End: > */ > > +void monitor_global_init(void) > +{ It's better to call it monitor_early_init() (or monitor_init_early()). > + monitor_protocol_event_init(); > +} > + > void monitor_init(CharDriverState *chr, int flags) > { > static int is_first_init = 1; > diff --git a/monitor.h b/monitor.h > index 5f4de1b..3342cb4 100644 > --- a/monitor.h > +++ b/monitor.h > @@ -51,6 +51,7 @@ typedef enum MonitorEvent { > > int monitor_cur_is_qmp(void); > > +void monitor_global_init(void); > void monitor_protocol_event(MonitorEvent event, QObject *data); > void monitor_init(CharDriverState *chr, int flags); > > diff --git a/trace-events b/trace-events > index 87cb96c..449b8ee 100644 > --- a/trace-events > +++ b/trace-events > @@ -641,6 +641,11 @@ esp_mem_writeb_cmd_ensel(uint32_t val) "Enable selection > (%2.2x)" > # monitor.c > handle_qmp_command(void *mon, const char *cmd_name) "mon %p cmd_name \"%s\"" > monitor_protocol_emitter(void *mon) "mon %p" > +monitor_protocol_event(uint32_t event, const char *evname, void *data) > "event=%d name \"%s\" data %p" > +monitor_protocol_event_handler(uint32_t event, void *data, uint64_t last, > uint64_t now) "event=%d data=%p last=%" PRId64 " now=%" PRId64 > +monitor_protocol_event_emit(uint32_t event, void *data) "event=%d data=%p" > +monitor_protocol_event_queue(uint32_t event, void *data, uint64_t rate, > uint64_t last, uint64_t now) "event=%d data=%p rate=%" PRId64 " last=%" > PRId64 " now=%" PRId64 > +monitor_protocol_event_throttle(uint32_t event, uint64_t rate) "event=%d > rate=%" PRId64 > > # hw/opencores_eth.c > open_eth_mii_write(unsigned idx, uint16_t v) "MII[%02x] <- %04x" > diff --git a/vl.c b/vl.c > index 23ab3a3..70eedfa 100644 > --- a/vl.c > +++ b/vl.c > @@ -3208,6 +3208,8 @@ int main(int argc, char **argv, char **envp) > } > loc_set_none(); > > + monitor_global_init(); > + > /* Init CPU def lists, based on config > * - Must be called after all the qemu_read_config_file() calls > * - Must be called before list_cpus()