* Marc-André Lureau (marcandre.lur...@gmail.com) wrote: > Hi > > On Fri, Mar 9, 2018 at 9:59 AM, Peter Xu <pet...@redhat.com> wrote: > > Originally QMP goes through these steps: > > > > JSON Parser --> QMP Dispatcher --> Respond > > /|\ (2) (3) | > > (1) | \|/ (4) > > +--------- main thread --------+ > > > > This patch does this: > > > > JSON Parser QMP Dispatcher --> Respond > > /|\ | /|\ (4) | > > | | (2) | (3) | (5) > > (1) | +-----> | \|/ > > +--------- main thread <-------+ > > > > So the parsing job and the dispatching job is isolated now. It gives us > > a chance in following up patches to totally move the parser outside. > > > > The isolation is done using one QEMUBH. Only one dispatcher QEMUBH is > > used for all the monitors. > > > > Reviewed-by: Stefan Hajnoczi <stefa...@redhat.com> > > Signed-off-by: Peter Xu <pet...@redhat.com> > > --- > > monitor.c | 201 > > +++++++++++++++++++++++++++++++++++++++++++++++++++++++------- > > 1 file changed, 178 insertions(+), 23 deletions(-) > > > > diff --git a/monitor.c b/monitor.c > > index de9343be87..5104e5db07 100644 > > --- a/monitor.c > > +++ b/monitor.c > > @@ -172,6 +172,13 @@ typedef struct { > > */ > > QmpCommandList *commands; > > bool qmp_caps[QMP_CAPABILITY__MAX]; > > + /* > > + * Protects qmp request/response queue. Please take monitor_lock > > + * first when used together. > > + */ > > + QemuMutex qmp_queue_lock; > > + /* Input queue that holds all the parsed QMP requests */ > > + GQueue *qmp_requests; > > } MonitorQMP; > > > > /* > > @@ -218,6 +225,8 @@ struct Monitor { > > /* Let's add monitor global variables to this struct. */ > > static struct { > > IOThread *mon_iothread; > > + /* Bottom half to dispatch the requests received from IO thread */ > > + QEMUBH *qmp_dispatcher_bh; > > } mon_global; > > > > /* QMP checker flags */ > > @@ -600,11 +609,13 @@ static void monitor_data_init(Monitor *mon, bool > > skip_flush, > > { > > memset(mon, 0, sizeof(Monitor)); > > qemu_mutex_init(&mon->out_lock); > > + qemu_mutex_init(&mon->qmp.qmp_queue_lock); > > mon->outbuf = qstring_new(); > > /* Use *mon_cmds by default. */ > > mon->cmd_table = mon_cmds; > > mon->skip_flush = skip_flush; > > mon->use_io_thr = use_io_thr; > > + mon->qmp.qmp_requests = g_queue_new(); > > } > > > > static void monitor_data_destroy(Monitor *mon) > > @@ -617,6 +628,8 @@ static void monitor_data_destroy(Monitor *mon) > > readline_free(mon->rs); > > QDECREF(mon->outbuf); > > qemu_mutex_destroy(&mon->out_lock); > > + qemu_mutex_destroy(&mon->qmp.qmp_queue_lock); > > + g_queue_free(mon->qmp.qmp_requests); > > } > > > > char *qmp_human_monitor_command(const char *command_line, bool > > has_cpu_index, > > @@ -1056,6 +1069,16 @@ static void monitor_init_qmp_commands(void) > > qmp_marshal_qmp_capabilities, QCO_NO_OPTIONS); > > } > > > > +static bool qmp_cap_enabled(Monitor *mon, QMPCapability cap) > > +{ > > + return mon->qmp.qmp_caps[cap]; > > +} > > + > > +static bool qmp_oob_enabled(Monitor *mon) > > +{ > > + return qmp_cap_enabled(mon, QMP_CAPABILITY_OOB); > > +} > > + > > static void qmp_caps_check(Monitor *mon, QMPCapabilityList *list, > > Error **errp) > > { > > @@ -3866,30 +3889,39 @@ static void monitor_qmp_respond(Monitor *mon, > > QObject *rsp, > > qobject_decref(rsp); > > } > > > > -static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) > > +struct QMPRequest { > > + /* Owner of the request */ > > + Monitor *mon; > > + /* "id" field of the request */ > > + QObject *id; > > + /* Request object to be handled */ > > + QObject *req; > > + /* > > + * Whether we need to resume the monitor afterward. This flag is > > + * used to emulate the old QMP server behavior that the current > > + * command must be completed before execution of the next one. > > + */ > > + bool need_resume; > > +}; > > +typedef struct QMPRequest QMPRequest; > > + > > +/* > > + * Dispatch one single QMP request. The function will free the req_obj > > + * and objects inside it before return. > > + */ > > +static void monitor_qmp_dispatch_one(QMPRequest *req_obj) > > { > > - QObject *req, *rsp = NULL, *id = NULL; > > + Monitor *mon, *old_mon; > > + QObject *req, *rsp = NULL, *id; > > QDict *qdict = NULL; > > - MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser); > > - Monitor *old_mon, *mon = container_of(mon_qmp, Monitor, qmp); > > - > > - Error *err = NULL; > > + bool need_resume; > > > > - req = json_parser_parse_err(tokens, NULL, &err); > > - if (!req && !err) { > > - /* json_parser_parse_err() sucks: can fail without setting @err */ > > - error_setg(&err, QERR_JSON_PARSING); > > - } > > - if (err) { > > - goto err_out; > > - } > > + req = req_obj->req; > > + mon = req_obj->mon; > > + id = req_obj->id; > > + need_resume = req_obj->need_resume; > > > > - qdict = qobject_to_qdict(req); > > - if (qdict) { > > - id = qdict_get(qdict, "id"); > > - qobject_incref(id); > > - qdict_del(qdict, "id"); > > - } /* else will fail qmp_dispatch() */ > > + g_free(req_obj); > > > > if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) { > > QString *req_json = qobject_to_json(req); > > @@ -3900,7 +3932,7 @@ static void handle_qmp_command(JSONMessageParser > > *parser, GQueue *tokens) > > old_mon = cur_mon; > > cur_mon = mon; > > > > - rsp = qmp_dispatch(cur_mon->qmp.commands, req); > > + rsp = qmp_dispatch(mon->qmp.commands, req); > > > > cur_mon = old_mon; > > > > @@ -3916,12 +3948,122 @@ static void handle_qmp_command(JSONMessageParser > > *parser, GQueue *tokens) > > } > > } > > > > -err_out: > > - monitor_qmp_respond(mon, rsp, err, id); > > + /* Respond if necessary */ > > + monitor_qmp_respond(mon, rsp, NULL, id); > > + > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */ > > + if (need_resume) { > > + monitor_resume(mon); > > + } > > > > qobject_decref(req); > > } > > > > +/* > > + * Pop one QMP request from monitor queues, return NULL if not found. > > + * We are using round-robin fashion to pop the request, to avoid > > + * processing command only on a very busy monitor. To achieve that, > > + * when we processed one request on specific monitor, we put that > > + * monitor to the end of mon_list queue. > > + */ > > +static QMPRequest *monitor_qmp_requests_pop_one(void) > > +{ > > + QMPRequest *req_obj = NULL; > > + Monitor *mon; > > + > > + qemu_mutex_lock(&monitor_lock); > > + > > + QTAILQ_FOREACH(mon, &mon_list, entry) { > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > > + req_obj = g_queue_pop_head(mon->qmp.qmp_requests); > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > > + if (req_obj) { > > + break; > > + } > > + } > > + > > + if (req_obj) { > > + /* > > + * We found one request on the monitor. Degrade this monitor's > > + * priority to lowest by re-inserting it to end of queue. > > + */ > > + QTAILQ_REMOVE(&mon_list, mon, entry); > > + QTAILQ_INSERT_TAIL(&mon_list, mon, entry); > > + } > > + > > + qemu_mutex_unlock(&monitor_lock); > > + > > + return req_obj; > > +} > > + > > +static void monitor_qmp_bh_dispatcher(void *data) > > +{ > > + QMPRequest *req_obj = monitor_qmp_requests_pop_one(); > > + > > + if (req_obj) { > > + monitor_qmp_dispatch_one(req_obj); > > + /* Reschedule instead of looping so the main loop stays responsive > > */ > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); > > + } > > +} > > + > > +static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens) > > +{ > > + QObject *req, *id = NULL; > > + QDict *qdict = NULL; > > + MonitorQMP *mon_qmp = container_of(parser, MonitorQMP, parser); > > + Monitor *mon = container_of(mon_qmp, Monitor, qmp); > > + Error *err = NULL; > > + QMPRequest *req_obj; > > + > > + req = json_parser_parse_err(tokens, NULL, &err); > > + if (!req && !err) { > > + /* json_parser_parse_err() sucks: can fail without setting @err */ > > + error_setg(&err, QERR_JSON_PARSING); > > + } > > + if (err) { > > + monitor_qmp_respond(mon, NULL, err, NULL); > > + qobject_decref(req); > > + return; > > + } > > + > > + qdict = qobject_to_qdict(req); > > + if (qdict) { > > + id = qdict_get(qdict, "id"); > > + qobject_incref(id); > > + qdict_del(qdict, "id"); > > + } /* else will fail qmp_dispatch() */ > > + > > + req_obj = g_new0(QMPRequest, 1); > > + req_obj->mon = mon; > > + req_obj->id = id; > > + req_obj->req = req; > > + req_obj->need_resume = false; > > + > > + /* > > + * If OOB is not enabled on current monitor, we'll emulate the old > > + * behavior that we won't process current monitor any more until > > + * it is responded. This helps make sure that as long as OOB is > > + * not enabled, the server will never drop any command. > > + */ > > + if (!qmp_oob_enabled(mon)) { > > + monitor_suspend(mon); > > + req_obj->need_resume = true; > > + } > > + > > + /* > > + * Put the request to the end of queue so that requests will be > > + * handled in time order. Ownership for req_obj, req, id, > > I think the order is not respected if subsequent messages have errors > (in either json parsing, dispatch_check_obj, oob_check). So if I > enable oob, and queue a few command, then send a bad command/message, > I won't be able to tell for which command.
Doesn't OOB insist on having an ID field with the command? Dave > > + * etc. will be delivered to the handler side. > > + */ > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > > + g_queue_push_tail(mon->qmp.qmp_requests, req_obj); > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > > + > > + /* Kick the dispatcher routine */ > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh); > > +} > > + > > static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size) > > { > > Monitor *mon = opaque; > > @@ -4134,6 +4276,15 @@ static void monitor_iothread_init(void) > > { > > mon_global.mon_iothread = iothread_create("mon_iothread", > > &error_abort); > > + > > + /* > > + * This MUST be on main loop thread since we have commands that > > + * have assumption to be run on main loop thread. It would be > > + * nice that one day we can remove this assumption in the future. > > + */ > > + mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(), > > + monitor_qmp_bh_dispatcher, > > + NULL); > > } > > > > void monitor_init_globals(void) > > @@ -4280,6 +4431,10 @@ void monitor_cleanup(void) > > } > > qemu_mutex_unlock(&monitor_lock); > > > > + /* QEMUBHs needs to be deleted before destroying the IOThread. */ > > + qemu_bh_delete(mon_global.qmp_dispatcher_bh); > > + mon_global.qmp_dispatcher_bh = NULL; > > + > > iothread_destroy(mon_global.mon_iothread); > > mon_global.mon_iothread = NULL; > > } > > -- > > 2.14.3 > > > > > > > > -- > Marc-André Lureau > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK