Peter Xu <pet...@redhat.com> writes: > When we received too many qmp commands, previously we'll send > COMMAND_DROPPED events to monitors, then we'll drop the requests. It > can only solve the flow control of the request queue, however it'll not > really work since we might queue unlimited events in the response queue > which is a potential risk. > > Now instead of sending such an event, we stop consuming the client input > when we noticed that the queue is reaching its limitation before hand. > Then after we handled commands, we'll try to resume the monitor when > needed. > > Signed-off-by: Peter Xu <pet...@redhat.com> > --- > monitor.c | 46 ++++++++++++++++++++++++++++++---------------- > 1 file changed, 30 insertions(+), 16 deletions(-) > > diff --git a/monitor.c b/monitor.c > index 215029bc22..ebf862914f 100644 > --- a/monitor.c > +++ b/monitor.c > @@ -164,6 +164,8 @@ struct MonFdset { > QLIST_ENTRY(MonFdset) next; > }; > > +#define QMP_REQ_QUEUE_LEN_MAX (8) > + > typedef struct { > JSONMessageParser parser; > /* > @@ -396,10 +398,21 @@ static void monitor_qmp_try_resume(Monitor *mon) > { > assert(monitor_is_qmp(mon)); > qemu_mutex_lock(&mon->qmp.qmp_lock); > + > + if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) { > + /* > + * This should not happen, but in case if it happens, we > + * should still keep the monitor in suspend state > + */ > + qemu_mutex_unlock(&mon->qmp.qmp_lock); > + return; > + } > + > if (mon->qmp.need_resume) { > monitor_resume(mon); > mon->qmp.need_resume = false; > } > + > qemu_mutex_unlock(&mon->qmp.qmp_lock); > } > > @@ -4213,7 +4226,14 @@ static void monitor_qmp_bh_dispatcher(void *data) > qemu_bh_schedule(qmp_dispatcher_bh); > } > > -#define QMP_REQ_QUEUE_LEN_MAX (8) > +/* Called with Monitor.qmp.qmp_lock held. */ > +static void monitor_qmp_suspend_locked(Monitor *mon) > +{ > + assert(monitor_is_qmp(mon)); > + assert(mon->qmp.need_resume == false); > + monitor_suspend(mon); > + mon->qmp.need_resume = true; > +} > > static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) > { > @@ -4266,22 +4286,16 @@ static void handle_qmp_command(JSONMessageParser > *parser, GQueue *tokens) > * OOB is not enabled, the server will never drop any command. > */ > if (!qmp_oob_enabled(mon)) { > - monitor_suspend(mon); > - mon->qmp.need_resume = true; > + monitor_qmp_suspend_locked(mon); > } else { > - /* Drop the request if queue is full. */ > - if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) { > - qemu_mutex_unlock(&mon->qmp.qmp_lock); > - /* > - * FIXME @id's scope is just @mon, and broadcasting it is > - * wrong. If another monitor's client has a command with > - * the same ID in flight, the event will incorrectly claim > - * that command was dropped. > - */ > - qapi_event_send_command_dropped(id, > - COMMAND_DROP_REASON_QUEUE_FULL); > - qmp_request_free(req_obj); > - return; > + /* > + * If the queue is reaching the length limitation, we queue > + * this command, meanwhile we suspend the monitor to block new > + * commands. We'll resume ourselves until the queue has more > + * space. > + */ > + if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX - 1) { > + monitor_qmp_suspend_locked(mon); > } > }
If I understand the code correctly, the response queue mon->qmp..qmp_requests gets drained into the output buffer mon->outbuf by bottom half monitor_qmp_bh_responder(). The response queue remains small, it's the output buffer that grows without bounds. Your test for "limit exceeded" measures the response queue. It needs to measure the output buffer instead. We could drain the response queue only when the output buffer isn't too full. I think that makes sense only if we have a compelling use for keeping responses in QDict form for a longer time. Marc-André has a patch to cut out the response queue. Whether it still makes sense I don't know. [PATCH v3 03/38] Revert "qmp: isolate responses into io thread"