This will replace the current QMP server once all the functions are implemented.
Signed-off-by: Anthony Liguori <aligu...@us.ibm.com> --- v1 -> v2 - support for get_fd - support for async commands - free request object on error path diff --git a/qmp-core.c b/qmp-core.c index 7f60942..22b413b 100644 --- a/qmp-core.c +++ b/qmp-core.c @@ -47,8 +47,18 @@ struct QmpState int (*add_connection)(QmpState *s, QmpConnection *conn); void (*del_connection)(QmpState *s, int global_handle, Error **errp); void (*event)(QmpState *s, QObject *data); + int (*get_fd)(QmpState *s); }; +typedef struct QmpSession +{ + JSONMessageParser parser; + QmpState state; + CharDriverState *chr; + int max_global_handle; + QTAILQ_HEAD(, QmpConnection) connections; +} QmpSession; + static QTAILQ_HEAD(, QmpCommand) qmp_commands = QTAILQ_HEAD_INITIALIZER(qmp_commands); @@ -128,11 +138,16 @@ void qmp_state_add_connection(QmpState *sess, const char *event_name, QmpSignal conn->global_handle = sess->add_connection(sess, conn); } -void qmp_put_event(QmpState *sess, int global_handle, Error **errp) +void qmp_put_event(QmpState *sess, int64_t global_handle, Error **errp) { sess->del_connection(sess, global_handle, errp); } +int qmp_state_get_fd(QmpState *sess) +{ + return sess->get_fd(sess); +} + void qmp_state_event(QmpConnection *conn, QObject *data) { QDict *event = qdict_new(); @@ -205,3 +220,230 @@ void qmp_signal_disconnect(QmpSignal *obj, int handle) } } } + +static QObject *qmp_dispatch_err(QmpState *state, QList *tokens, Error **errp) +{ + const char *command; + QDict *args, *dict; + QObject *request; + QmpCommand *cmd; + QObject *ret = NULL; + Error *err = NULL; + + request = json_parser_parse_err(tokens, NULL, &err); + if (request == NULL) { + if (err == NULL) { + error_set(errp, QERR_JSON_PARSE_ERROR, "no valid JSON object"); + } else { + error_propagate(errp, err); + } + goto out; + } + if (qobject_type(request) != QTYPE_QDICT) { + error_set(errp, QERR_JSON_PARSE_ERROR, "request is not a dictionary"); + goto out; + } + + dict = qobject_to_qdict(request); + if (!qdict_haskey(dict, "execute")) { + error_set(errp, QERR_JSON_PARSE_ERROR, "no execute key"); + goto out; + } + + command = qdict_get_str(dict, "execute"); + cmd = qmp_find_command(command); + if (cmd == NULL) { + error_set(errp, QERR_COMMAND_NOT_FOUND, command); + goto out; + } + + if (!qdict_haskey(dict, "arguments")) { + args = qdict_new(); + } else { + args = qdict_get_qdict(dict, "arguments"); + QINCREF(args); + } + + switch (cmd->type) { + case QCT_NORMAL: + cmd->fn(args, &ret, errp); + if (ret == NULL) { + ret = QOBJECT(qdict_new()); + } + break; + case QCT_STATEFUL: + cmd->sfn(state, args, &ret, errp); + if (ret == NULL) { + ret = QOBJECT(qdict_new()); + } + break; + case QCT_ASYNC: { + QmpCommandState *s = qemu_mallocz(sizeof(*s)); + // FIXME save async commands and do something + // smart if disconnect occurs before completion + s->state = state; + s->tag = NULL; + if (qdict_haskey(dict, "tag")) { + s->tag = qdict_get(dict, "tag"); + qobject_incref(s->tag); + } + cmd->afn(args, errp, s); + ret = NULL; + } + break; + } + + QDECREF(args); + +out: + qobject_decref(request); + + return ret; +} + +static QObject *qmp_dispatch(QmpState *state, QList *tokens) +{ + Error *err = NULL; + QObject *ret; + QDict *rsp; + + ret = qmp_dispatch_err(state, tokens, &err); + + rsp = qdict_new(); + if (err) { + qdict_put_obj(rsp, "error", error_get_qobject(err)); + error_free(err); + } else if (ret) { + qdict_put_obj(rsp, "return", ret); + } else { + QDECREF(rsp); + return NULL; + } + + return QOBJECT(rsp); +} + +static void qmp_chr_parse(JSONMessageParser *parser, QList *tokens) +{ + QmpSession *s = container_of(parser, QmpSession, parser); + QObject *rsp; + QString *str; + + rsp = qmp_dispatch(&s->state, tokens); + + if (rsp) { + str = qobject_to_json(rsp); + qemu_chr_write(s->chr, (void *)str->string, str->length); + qemu_chr_write(s->chr, (void *)"\n", 1); + + QDECREF(str); + qobject_decref(rsp); + } +} + +static int qmp_chr_can_receive(void *opaque) +{ + return 1024; +} + +static void qmp_chr_receive(void *opaque, const uint8_t *buf, int size) +{ + QmpSession *s = opaque; + json_message_parser_feed(&s->parser, (char *)buf, size); +} + +static void qmp_chr_send_greeting(QmpSession *s) +{ + VersionInfo *info; + QObject *vers; + QObject *greeting; + QString *str; + + info = qmp_query_version(NULL); + vers = qmp_marshal_type_VersionInfo(info); + qmp_free_version_info(info); + + greeting = qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': []} }", + vers); + str = qobject_to_json(greeting); + qobject_decref(greeting); + + qemu_chr_write(s->chr, (void *)str->string, str->length); + qemu_chr_write(s->chr, (void *)"\n", 1); + QDECREF(str); +} + +static void qmp_chr_event(void *opaque, int event) +{ + QmpSession *s = opaque; + switch (event) { + case CHR_EVENT_OPENED: + // FIXME disconnect any connected signals including defaults + json_message_parser_init(&s->parser, qmp_chr_parse); + qmp_chr_send_greeting(s); + break; + case CHR_EVENT_CLOSED: + json_message_parser_flush(&s->parser); + break; + } +} + +static int qmp_chr_add_connection(QmpState *state, QmpConnection *conn) +{ + QmpSession *s = container_of(state, QmpSession, state); + + QTAILQ_INSERT_TAIL(&s->connections, conn, node); + return ++s->max_global_handle; +} + +static void qmp_chr_send_event(QmpState *state, QObject *event) +{ + QmpSession *s = container_of(state, QmpSession, state); + QString *str; + + str = qobject_to_json(event); + qemu_chr_write(s->chr, (void *)str->string, str->length); + qemu_chr_write(s->chr, (void *)"\n", 1); + QDECREF(str); +} + +static void qmp_chr_del_connection(QmpState *state, int global_handle, Error **errp) +{ + QmpSession *s = container_of(state, QmpSession, state); + QmpConnection *conn; + + QTAILQ_FOREACH(conn, &s->connections, node) { + if (conn->global_handle == global_handle) { + qmp_signal_disconnect(conn->signal, conn->handle); + QTAILQ_REMOVE(&s->connections, conn, node); + qemu_free(conn); + return; + } + } + + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "tag", "valid event handle"); +} + +static int qmp_chr_get_fd(QmpState *state) +{ + QmpSession *s = container_of(state, QmpSession, state); + + return qemu_chr_get_msgfd(s->chr); +} + +void qmp_init_chardev(CharDriverState *chr) +{ + QmpSession *s = qemu_mallocz(sizeof(*s)); + + s->chr = chr; + s->state.add_connection = qmp_chr_add_connection; + s->state.event = qmp_chr_send_event; + s->state.del_connection = qmp_chr_del_connection; + s->state.get_fd = qmp_chr_get_fd; + + s->max_global_handle = 0; + QTAILQ_INIT(&s->connections); + + qemu_chr_add_handlers(chr, qmp_chr_can_receive, qmp_chr_receive, + qmp_chr_event, s); +} diff --git a/qmp-core.h b/qmp-core.h index c9c8b63..837ca07 100644 --- a/qmp-core.h +++ b/qmp-core.h @@ -60,9 +60,10 @@ int qmp_signal_connect(QmpSignal *obj, void *func, void *opaque); void qmp_signal_disconnect(QmpSignal *obj, int handle); void qmp_state_add_connection(QmpState *sess, const char *name, QmpSignal *obj, int handle, QmpConnection *conn); -void qmp_put_event(QmpState *sess, int global_handle, Error **errp); void qmp_state_event(QmpConnection *conn, QObject *data); +int qmp_state_get_fd(QmpState *sess); + #define signal_init(obj) do { \ (obj)->signal = qmp_signal_init(); \ } while (0) @@ -83,4 +84,6 @@ void qmp_state_event(QmpConnection *conn, QObject *data); } \ } while(0) +void qmp_init_chardev(CharDriverState *chr); + #endif -- 1.7.0.4