On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote: > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser > *parser, GQueue *tokens, > } > } > > -err_out: > - monitor_qmp_respond(mon, rsp, err, id); > + /* Respond if necessary */ > + monitor_qmp_respond(mon, rsp, NULL, id); > + > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */ > + if (!qmp_oob_enabled(mon)) { > + monitor_resume(mon);
monitor_resume() does not work between threads: if the event loop is currently blocked in poll() it won't notice that the monitor fd should be watched again. Please add aio_notify() to monitor_resume() and monitor_suspend(). That way the event loop is forced to check can_read() again. > + } > > qobject_decref(req); > } > > +/* > + * Pop one QMP request from monitor queues, return NULL if not found. > + * We are using round-robin fasion to pop the request, to avoid > + * processing command only on a very busy monitor. To achieve that, > + * when we processed one request on specific monitor, we put that > + * monitor to the end of mon_list queue. > + */ > +static QMPRequest *monitor_qmp_requests_pop_one(void) > +{ > + QMPRequest *req_obj = NULL; > + Monitor *mon; > + > + qemu_mutex_lock(&monitor_lock); > + > + QTAILQ_FOREACH(mon, &mon_list, entry) { > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); Please add a doc comment about the monitor_lock < qmp_queue_lock lock ordering in the qmp_queue_lock field declaration so that deadlocks can be prevented. > + req_obj = g_queue_pop_head(mon->qmp.qmp_requests); > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > + if (req_obj) { > + break; > + } > + } > + > + if (req_obj) { > + /* > + * We found one request on the monitor. Degrade this monitor's > + * priority to lowest by re-inserting it to end of queue. > + */ > + QTAILQ_REMOVE(&mon_list, mon, entry); > + QTAILQ_INSERT_TAIL(&mon_list, mon, entry); > + } > + > + qemu_mutex_unlock(&monitor_lock); > + > + return req_obj; > +} > + > +static void monitor_qmp_bh_dispatcher(void *data) > +{ > + QMPRequest *req_obj; > + > + while (true) { Previously QEMU could only dispatch 1 QMP command per main loop iteration. Now multiple requests can be processed in a single main loop iteration. If a producer enqueues requests faster than the dispatcher can dequeue them then this is infinite loop will prevent the caller (i.e. QEMU main loop) from making progress. The following keeps 1 QMP command per main loop iteration and avoids the infinite loop: static void monitor_qmp_bh_dispatcher(void *data) { QMPRequest *req_obj = monitor_qmp_requests_pop_one(); if (req_obj) { monitor_qmp_dispatch_one(req_obj); /* Reschedule instead of looping so the main loop stays responsive */ qemu_bh_schedule(mon_global.qmp_dispatcher_bh); } } > + /* > + * Put the request to the end of queue so that requests will be > + * handled in time order. Ownership for req_obj, req, id, > + * etc. will be delivered to the handler side. > + */ > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > + g_queue_push_tail(mon->qmp.qmp_requests, req_obj); > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > + > + /* Kick the dispatcher routine */ > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); > + > + /* > + * If OOB is not enabled on current monitor, we'll emulate the old > + * behavior that we won't process current monitor any more until > + * it is responded. This helps make sure that as long as OOB is > + * not enabled, the server will never drop any command. > + */ > + if (!qmp_oob_enabled(mon)) { > + monitor_suspend(mon); > + } monitor_suspend() must be called before g_queue_push_tail(). Otherwise the other thread might complete the request and call monitor_resume() before we call monitor_suspend().
signature.asc
Description: PGP signature