Hi On Fri, Mar 9, 2018 at 10:00 AM, Peter Xu <pet...@redhat.com> wrote: > For those monitors who have enabled IO thread, we'll offload the > responding procedure into IO thread. The main reason is that chardev is > not thread safe, and we need to do all the read/write IOs in the same > thread. For use_io_thr=true monitors, that thread is the IO thread.
Actually, the chr write path is suppose to be somewhat thread safe (chr_write_lock). Secondly, the function responsible to write monitor data has some thread-safety, it's called monitor_flush_locked(), because you need the mon->out_lock. I think that patch is making things more complicated than they need to be. You should be able to call monitor_json_emitter/monitor_puts() directly from any thread, it will queue the data, start writing, and add a watch if necessary in the appropriate context. Am I missing something? > We do this isolation in similar pattern as what we have done to the > request queue: we first create one response queue for each monitor, then > instead of replying directly in the main thread, we queue the responses > and kick the IO thread to do the rest of the job for us. > > A funny thing after doing this is that, when the QMP clients send "quit" > to QEMU, it's possible that we close the IOThread even earlier than > replying to that "quit". So another thing we need to do before cleaning > up the monitors is that we need to flush the response queue (we don't > need to do that for command queue; after all we are quitting) to make > sure replies for handled commands are always flushed back to clients. > > Reviewed-by: Fam Zheng <f...@redhat.com> > Reviewed-by: Stefan Hajnoczi <stefa...@redhat.com> > Signed-off-by: Peter Xu <pet...@redhat.com> > --- > monitor.c | 96 > ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 95 insertions(+), 1 deletion(-) > > diff --git a/monitor.c b/monitor.c > index 5c8afe9f50..c6de5f123e 100644 > --- a/monitor.c > +++ b/monitor.c > @@ -179,6 +179,8 @@ typedef struct { > QemuMutex qmp_queue_lock; > /* Input queue that holds all the parsed QMP requests */ > GQueue *qmp_requests; > + /* Output queue contains all the QMP responses in order */ > + GQueue *qmp_responses; > } MonitorQMP; > > /* > @@ -205,6 +207,7 @@ struct Monitor { > bool skip_flush; > bool use_io_thr; > > + /* We can't access guest memory when holding the lock */ > QemuMutex out_lock; > QString *outbuf; > guint out_watch; > @@ -227,6 +230,8 @@ static struct { > IOThread *mon_iothread; > /* Bottom half to dispatch the requests received from IO thread */ > QEMUBH *qmp_dispatcher_bh; > + /* Bottom half to deliver the responses back to clients */ > + QEMUBH *qmp_respond_bh; > } mon_global; > > /* QMP checker flags */ > @@ -416,7 +421,8 @@ int monitor_fprintf(FILE *stream, const char *fmt, ...) > return 0; > } > > -static void monitor_json_emitter(Monitor *mon, const QObject *data) > +static void monitor_json_emitter_raw(Monitor *mon, > + QObject *data) > { > QString *json; > > @@ -430,6 +436,71 @@ static void monitor_json_emitter(Monitor *mon, const > QObject *data) > QDECREF(json); > } > > +static void monitor_json_emitter(Monitor *mon, QObject *data) > +{ > + if (mon->use_io_thr) { > + /* > + * If using IO thread, we need to queue the item so that IO > + * thread will do the rest for us. Take refcount so that > + * caller won't free the data (which will be finally freed in > + * responder thread). > + */ > + qobject_incref(data); > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > + g_queue_push_tail(mon->qmp.qmp_responses, (void *)data); > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > + qemu_bh_schedule(mon_global.qmp_respond_bh); > + } else { > + /* > + * If not using monitor IO thread, then we are in main thread. > + * Do the emission right away. > + */ > + monitor_json_emitter_raw(mon, data); > + } > +} > + > +struct QMPResponse { > + Monitor *mon; > + QObject *data; > +}; > +typedef struct QMPResponse QMPResponse; > + > +/* > + * Return one QMPResponse. The response is only valid if > + * response.data is not NULL. > + */ > +static QMPResponse monitor_qmp_response_pop_one(void) > +{ > + Monitor *mon; > + QObject *data = NULL; > + > + qemu_mutex_lock(&monitor_lock); > + QTAILQ_FOREACH(mon, &mon_list, entry) { > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock); > + data = g_queue_pop_head(mon->qmp.qmp_responses); > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); > + if (data) { > + break; > + } > + } > + qemu_mutex_unlock(&monitor_lock); > + return (QMPResponse) { .mon = mon, .data = data }; > +} > + > +static void monitor_qmp_bh_responder(void *opaque) > +{ > + QMPResponse response; > + > + while (true) { > + response = monitor_qmp_response_pop_one(); > + if (!response.data) { > + break; > + } > + monitor_json_emitter_raw(response.mon, response.data); > + qobject_decref(response.data); > + } > +} > + > static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = { > /* Limit guest-triggerable events to 1 per second */ > [QAPI_EVENT_RTC_CHANGE] = { 1000 * SCALE_MS }, > @@ -616,6 +687,7 @@ static void monitor_data_init(Monitor *mon, bool > skip_flush, > mon->skip_flush = skip_flush; > mon->use_io_thr = use_io_thr; > mon->qmp.qmp_requests = g_queue_new(); > + mon->qmp.qmp_responses = g_queue_new(); > } > > static void monitor_data_destroy(Monitor *mon) > @@ -630,6 +702,7 @@ static void monitor_data_destroy(Monitor *mon) > qemu_mutex_destroy(&mon->out_lock); > qemu_mutex_destroy(&mon->qmp.qmp_queue_lock); > g_queue_free(mon->qmp.qmp_requests); > + g_queue_free(mon->qmp.qmp_responses); > } > > char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index, > @@ -4367,6 +4440,15 @@ static void monitor_iothread_init(void) > mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(), > monitor_qmp_bh_dispatcher, > NULL); > + > + /* > + * Unlike the dispatcher BH, this must be run on the monitor IO > + * thread, so that monitors that are using IO thread will make > + * sure read/write operations are all done on the IO thread. > + */ > + mon_global.qmp_respond_bh = aio_bh_new(monitor_get_aio_context(), > + monitor_qmp_bh_responder, > + NULL); > } > > void monitor_init_globals(void) > @@ -4505,9 +4587,19 @@ void monitor_cleanup(void) > */ > iothread_stop(mon_global.mon_iothread); > > + /* > + * After we have IOThread to send responses, it's possible that > + * when we stop the IOThread there are still replies queued in the > + * responder queue. Flush all of them. Note that even after this > + * flush it's still possible that out buffer is not flushed. > + * It'll be done in below monitor_flush() as the last resort. > + */ > + monitor_qmp_bh_responder(NULL); > + > qemu_mutex_lock(&monitor_lock); > QTAILQ_FOREACH_SAFE(mon, &mon_list, entry, next) { > QTAILQ_REMOVE(&mon_list, mon, entry); > + monitor_flush(mon); > monitor_data_destroy(mon); > g_free(mon); > } > @@ -4516,6 +4608,8 @@ void monitor_cleanup(void) > /* QEMUBHs needs to be deleted before destroying the IOThread. */ > qemu_bh_delete(mon_global.qmp_dispatcher_bh); > mon_global.qmp_dispatcher_bh = NULL; > + qemu_bh_delete(mon_global.qmp_respond_bh); > + mon_global.qmp_respond_bh = NULL; > > iothread_destroy(mon_global.mon_iothread); > mon_global.mon_iothread = NULL; > -- > 2.14.3 > > -- Marc-André Lureau