On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote: > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote: > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser > > *parser, GQueue *tokens, > > } > > } > > > > -err_out: > > - monitor_qmp_respond(mon, rsp, err, id); > > + /* Respond if necessary */ > > + monitor_qmp_respond(mon, rsp, NULL, id); > > + > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */ > > + if (!qmp_oob_enabled(mon)) { > > + monitor_resume(mon); > > monitor_resume() does not work between threads: if the event loop is > currently blocked in poll() it won't notice that the monitor fd should > be watched again. > > Please add aio_notify() to monitor_resume() and monitor_suspend(). That > way the event loop is forced to check can_read() again.
Ah, yes. I think monitor_suspend() does not need the notify? Since if it's sleeping it won't miss the next check in can_read() after all? Regarding to monitor_resume(), I noticed that I missed the fact that it's only tailored for HMP before, which seems to be a bigger problem. Do you agree with a change like this? diff --git a/monitor.c b/monitor.c index 9970418d6f..8f96880ad7 100644 --- a/monitor.c +++ b/monitor.c @@ -4244,10 +4244,12 @@ int monitor_suspend(Monitor *mon) void monitor_resume(Monitor *mon) { - if (!mon->rs) - return; if (atomic_dec_fetch(&mon->suspend_cnt) == 0) { - readline_show_prompt(mon->rs); + if (monitor_is_qmp(mon)) { + aio_notify(mon_global.mon_iothread->ctx); + } else { + assert(mon->rs); + readline_show_prompt(mon->rs); + } } } Even, I'm thinking about whether I should start to introduce iothread_notify() now to mask out the IOThread.ctx details. > > > + } > > > > qobject_decref(req); > > } > > > > +/* > > + * Pop one QMP request from monitor queues, return NULL if not found. > > + * We are using round-robin fasion to pop the request, to avoid > > + * processing command only on a very busy monitor. To achieve that, > > + * when we processed one request on specific monitor, we put that > > + * monitor to the end of mon_list queue. > > + */ > > +static QMPRequest *monitor_qmp_requests_pop_one(void) > > +{ > > + QMPRequest *req_obj = NULL; > > + Monitor *mon; > > + > > + qemu_mutex_lock(&monitor_lock); > > + > > + QTAILQ_FOREACH(mon, &mon_list, entry) { > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > > Please add a doc comment about the monitor_lock < qmp_queue_lock lock > ordering in the qmp_queue_lock field declaration so that deadlocks can > be prevented. Will do. > > > + req_obj = g_queue_pop_head(mon->qmp.qmp_requests); > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > > + if (req_obj) { > > + break; > > + } > > + } > > + > > + if (req_obj) { > > + /* > > + * We found one request on the monitor. Degrade this monitor's > > + * priority to lowest by re-inserting it to end of queue. > > + */ > > + QTAILQ_REMOVE(&mon_list, mon, entry); > > + QTAILQ_INSERT_TAIL(&mon_list, mon, entry); > > + } > > + > > + qemu_mutex_unlock(&monitor_lock); > > + > > + return req_obj; > > +} > > + > > +static void monitor_qmp_bh_dispatcher(void *data) > > +{ > > + QMPRequest *req_obj; > > + > > + while (true) { > > Previously QEMU could only dispatch 1 QMP command per main loop > iteration. Now multiple requests can be processed in a single main loop > iteration. > > If a producer enqueues requests faster than the dispatcher can dequeue > them then this is infinite loop will prevent the caller (i.e. QEMU main > loop) from making progress. > > The following keeps 1 QMP command per main loop iteration and avoids the > infinite loop: > > static void monitor_qmp_bh_dispatcher(void *data) > { > QMPRequest *req_obj = monitor_qmp_requests_pop_one(); > > if (req_obj) { > monitor_qmp_dispatch_one(req_obj); > > /* Reschedule instead of looping so the main loop stays responsive */ > qemu_bh_schedule(mon_global.qmp_dispatcher_bh); > } > } Yes, this sounds better! > > > + /* > > + * Put the request to the end of queue so that requests will be > > + * handled in time order. Ownership for req_obj, req, id, > > + * etc. will be delivered to the handler side. > > + */ > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > > + g_queue_push_tail(mon->qmp.qmp_requests, req_obj); > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > > + > > + /* Kick the dispatcher routine */ > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); > > + > > + /* > > + * If OOB is not enabled on current monitor, we'll emulate the old > > + * behavior that we won't process current monitor any more until > > + * it is responded. This helps make sure that as long as OOB is > > + * not enabled, the server will never drop any command. > > + */ > > + if (!qmp_oob_enabled(mon)) { > > + monitor_suspend(mon); > > + } > > monitor_suspend() must be called before g_queue_push_tail(). Otherwise > the other thread might complete the request and call monitor_resume() > before we call monitor_suspend(). Reasonable. Thanks, -- Peter Xu