Common code for sending/recieving RPCs via http over virtproxy channel. All communication is done via asynchronous read/write handlers and using non-blocking reads/writes
Signed-off-by: Michael Roth <mdr...@linux.vnet.ibm.com> --- virtagent-common.c | 431 ++++++++++++++++++++++++++++++++++++++++++++++++++++ virtagent-common.h | 73 +++++++++ 2 files changed, 504 insertions(+), 0 deletions(-) create mode 100644 virtagent-common.c create mode 100644 virtagent-common.h diff --git a/virtagent-common.c b/virtagent-common.c new file mode 100644 index 0000000..cc58938 --- /dev/null +++ b/virtagent-common.c @@ -0,0 +1,431 @@ +/* + * virt-agent - common host/guest RPC functions + * + * Copyright IBM Corp. 2010 + * + * Authors: + * Adam Litke <agli...@linux.vnet.ibm.com> + * Michael Roth <mdr...@linux.vnet.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include "virtagent-common.h" + +#define VA_READ true +#define VA_SEND false + +enum va_rpc_type { + VA_RPC_REQUEST, + VA_RPC_RESPONSE, +}; + +typedef struct VARPCState { + char hdr[VA_HDR_LEN_MAX]; + int fd; + size_t hdr_len; + size_t hdr_pos; + enum { + VA_READ_START, + VA_READ_HDR, + VA_READ_BODY, + VA_SEND_START, + VA_SEND_HDR, + VA_SEND_BODY, + } state; + enum va_rpc_type rpc_type; + char *content; + size_t content_len; + size_t content_pos; + VARPCData *data; +} VARPCState; + +static void va_rpc_read_handler(void *opaque); +static void va_rpc_send_handler(void *opaque); + +static int end_of_header(char *buf, int end_pos) +{ + return !strncmp(buf+(end_pos-2), "\n\r\n", 3); +} + +static void va_rpc_hdr_init(VARPCState *s) { + const char *preamble; + + TRACE("called"); + /* essentially ignored in the context of virtagent, but might as well */ + if (s->rpc_type == VA_RPC_REQUEST) { + preamble = "POST /RPC2 HTTP/1.1"; + } else if (s->rpc_type == VA_RPC_RESPONSE) { + preamble = "HTTP/1.1 200 OK"; + } else { + s->hdr_len = 0; + return; + } + + s->hdr_len = sprintf(s->hdr, + "%s" EOL + "Content-Type: text/xml" EOL + "Content-Length: %u" EOL EOL, + preamble, + (uint32_t)s->content_len); +} + +static void va_rpc_parse_hdr(VARPCState *s) +{ + int i, line_pos = 0; + char line_buf[4096]; + + for (i = 0; i < VA_HDR_LEN_MAX; ++i) { + if (s->hdr[i] != '\n') { + /* read line */ + line_buf[line_pos++] = s->hdr[i]; + } else { + /* process line */ + if (strncmp(line_buf, "Content-Length: ", 16) == 0) { + s->content_len = atoi(&line_buf[16]); + return; + } + line_pos = 0; + } + } +} + +static VARPCState *va_rpc_state_new(VARPCData *data, int fd, + enum va_rpc_type rpc_type, bool read) +{ + VARPCState *s = qemu_mallocz(sizeof(VARPCState)); + + s->rpc_type = rpc_type; + s->fd = fd; + s->data = data; + if (s->data == NULL) { + goto EXIT_BAD; + } + + if (read) { + s->state = VA_READ_START; + s->content = NULL; + } else { + s->state = VA_SEND_START; + if (rpc_type == VA_RPC_REQUEST) { + s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_req_xml); + s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_req_xml); + } else if (rpc_type == VA_RPC_RESPONSE) { + s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_resp_xml); + s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_resp_xml); + } else { + LOG("unknown rcp type"); + goto EXIT_BAD; + } + va_rpc_hdr_init(s); + if (s->hdr_len == 0) { + LOG("failed to initialize http header"); + goto EXIT_BAD; + } + } + + return s; +EXIT_BAD: + qemu_free(s); + return NULL; +} + +/* called by va_rpc_read_handler after reading requests */ +static int va_rpc_send_response(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_SEND); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s); + + return 0; +} + +static void va_rpc_read_handler_completion(VARPCState *s) { + int ret; + + if (s->rpc_type == VA_RPC_REQUEST) { + /* server read request, call it's cb function then set up + * a send handler for the rpc response if there weren't any + * communication errors + */ + s->data->cb(s->data); + if (s->data->status == VA_RPC_STATUS_OK) { + ret = va_rpc_send_response(s->data, s->fd); + if (ret != 0) { + LOG("error setting up send handler for rpc response"); + } + } else { + LOG("error reading rpc request, skipping response"); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } + } else if (s->rpc_type == VA_RPC_RESPONSE) { + /* client read response, call it's cb function and complete + * the RPC + */ + s->data->cb(s->data); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } else { + LOG("unknown rpc_type"); + } + if (s->content != NULL) { + qemu_free(s->content); + } + qemu_free(s); +} + +static void va_rpc_read_handler(void *opaque) +{ + VARPCState *s = opaque; + int ret; + + TRACE("called with opaque: %p", opaque); + + switch (s->state) { + case VA_READ_START: + s->state = VA_READ_HDR; + case VA_READ_HDR: + while((ret = read(s->fd, s->hdr + s->hdr_pos, 1)) > 0 + && s->hdr_pos < VA_HDR_LEN_MAX) { + s->hdr_pos += ret; + if (end_of_header(s->hdr, s->hdr_pos - 1)) { + break; + } + } + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else if (s->hdr_pos >= VA_HDR_LEN_MAX) { + LOG("http header too long"); + goto out_bad; + } else { + s->content_len = -1; + va_rpc_parse_hdr(s); + if (s->content_len == -1) { + LOG("malformed http header"); + goto out_bad; + } else if (s->content_len > VA_CONTENT_LEN_MAX) { + LOG("http content length too long"); + goto out_bad; + } + s->content = qemu_mallocz(s->content_len); + s->state = VA_READ_BODY; + } + case VA_READ_BODY: + while(s->content_pos < s->content_len) { + ret = read(s->fd, s->content + s->content_pos, + s->content_len - s->content_pos); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connection closed unexpectedly:" + " read %u bytes, expected %u bytes", + (unsigned int)s->content_pos, (unsigned int)s->content_len); + goto out_bad; + } + s->content_pos += ret; + } + + if (s->rpc_type == VA_RPC_REQUEST) { + s->data->req_xml = s->content; + s->data->req_xml_len = s->content_len; + } else if (s->rpc_type == VA_RPC_RESPONSE) { + s->data->resp_xml = s->content; + s->data->resp_xml_len = s->content_len; + } + s->data->status = VA_RPC_STATUS_OK; + goto out; + default: + LOG("unknown state"); + goto out_bad; + } + +out_bad: + s->data->status = VA_RPC_STATUS_ERR; +out: + va_rpc_read_handler_completion(s); +} + +/* called by va_rpc_send_handler after sending requests */ +static int va_rpc_read_response(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_READ); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_read_handler, s); + + return 0; +} + +static void va_rpc_send_handler_completion(VARPCState *s) { + int ret; + + if (s->rpc_type == VA_RPC_REQUEST) { + /* client sent request. free request's memblock, and set up read + * handler for server response if there weren't any communication + * errors + */ + XMLRPC_MEMBLOCK_FREE(char, s->data->send_req_xml); + if (s->data->status == VA_RPC_STATUS_OK) { + ret = va_rpc_read_response(s->data, s->fd); + if (ret != 0) { + LOG("error setting up read handler for rpc response"); + } + } else { + s->data->cb(s->data); + LOG("error sending rpc request, skipping response"); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } + } else if (s->rpc_type == VA_RPC_RESPONSE) { + /* server sent response. call it's cb once more, then free + * response's memblock and complete the RPC + */ + s->data->cb(s->data); + XMLRPC_MEMBLOCK_FREE(char, s->data->send_resp_xml); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } else { + LOG("unknown rpc_type"); + } + qemu_free(s); +} + +static void va_rpc_send_handler(void *opaque) +{ + VARPCState *s = opaque; + int ret; + + TRACE("called with opaque: %p", opaque); + + switch (s->state) { + case VA_SEND_START: + s->state = VA_SEND_HDR; + case VA_SEND_HDR: + do { + ret = write(s->fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos); + if (ret <= 0) { + break; + } + s->hdr_pos += ret; + } while (s->hdr_pos < s->hdr_len); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else { + s->state = VA_SEND_BODY; + } + case VA_SEND_BODY: + do { + ret = write(s->fd, s->content + s->content_pos, + s->content_len - s->content_pos); + if (ret <= 0) { + break; + } + s->content_pos += ret; + } while (s->content_pos < s->content_len); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else { + s->data->status = VA_RPC_STATUS_OK; + goto out; + } + default: + LOG("unknown state"); + goto out_bad; + } + +out_bad: + s->data->status = VA_RPC_STATUS_ERR; +out: + va_rpc_send_handler_completion(s); +} + +/* called by rpc client + * one callback to data->cb after response is read. + * data and data->send_req_xml should be allocated by caller, + * callee will de-allocate these after calling data->cb(data) + * + * if non-zero returned however, caller should free data and hanging refs + */ +int va_rpc_send_request(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_SEND); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s); + + return 0; +} + +/* called by rpc server + * one callback to current data->cb after read, one callback after send. + * data should be allocated by caller, data->send_resp_xml should be + * allocated by first data->cb(data) callback, "callee" will de-allocate + * data and data->send_resp_xml after sending rpc response + * + * if non-zero returned however, caller should free data and hanging refs + */ +int va_rpc_read_request(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_READ); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up read handler for RPC request"); + vp_set_fd_handler(fd, va_rpc_read_handler, NULL, s); + return 0; +} diff --git a/virtagent-common.h b/virtagent-common.h new file mode 100644 index 0000000..6a58bcd --- /dev/null +++ b/virtagent-common.h @@ -0,0 +1,73 @@ +/* + * virt-agent - host/guest RPC client functions + * + * Copyright IBM Corp. 2010 + * + * Authors: + * Adam Litke <agli...@linux.vnet.ibm.com> + * Michael Roth <mdr...@linux.vnet.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ +#ifndef VIRTAGENT_COMMON_H +#define VIRTAGENT_COMMON_H + +#include <xmlrpc-c/base.h> +#include <xmlrpc-c/client.h> +#include <xmlrpc-c/server.h> +#include "qemu-common.h" +#include "qemu_socket.h" +#include "monitor.h" +#include "virtproxy.h" + +#define DEBUG_VA + +#ifdef DEBUG_VA +#define TRACE(msg, ...) do { \ + fprintf(stderr, "%s:%s():L%d: " msg "\n", \ + __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \ +} while(0) +#else +#define TRACE(msg, ...) \ + do { } while (0) +#endif + +#define LOG(msg, ...) do { \ + fprintf(stderr, "%s:%s(): " msg "\n", \ + __FILE__, __FUNCTION__, ## __VA_ARGS__); \ +} while(0) + +#define TADDR "127.0.0.1:8080" +#define URL "http://localhost:8080/RPC2" +#define NAME "QEMU virt-agent host client" +#define VERSION "1.0" +#define EOL "\r\n" + +#define VA_RPC_STATUS_OK 0 +#define VA_RPC_STATUS_ERR 1 +#define VA_HDR_LEN_MAX 4096 /* http header limit */ +#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */ + +typedef void (VARPCDataCallback)(void *rpc_data); +typedef struct VARPCData { + VARPCDataCallback *cb; + int status; + void *opaque; + /* provided/allocated by caller for sending as memblocks */ + xmlrpc_mem_block *send_req_xml; + xmlrpc_mem_block *send_resp_xml; + /* recieved, and passed to cb func, as char arrays */ + char *req_xml; + int req_xml_len; + char *resp_xml; + int resp_xml_len; + /* for use by QMP functions */ + MonitorCompletion *mon_cb; + void *mon_data; +} VARPCData; + +int va_rpc_send_request(VARPCData *data, int fd); +int va_rpc_read_request(VARPCData *data, int fd); +#endif /* VIRTAGENT_COMMON_H */ -- 1.7.0.4