* Daniel P. Berrange (berra...@redhat.com) wrote: > Add a QIOChannel subclass that can run the websocket protocol over > the top of another QIOChannel instance. This initial implementation > is only capable of acting as a websockets server. There is no support > for acting as a websockets client yet. > > Signed-off-by: Daniel P. Berrange <berra...@redhat.com> > --- > include/io/channel-websock.h | 108 +++++ > io/Makefile.objs | 1 + > io/channel-websock.c | 965 > +++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 1074 insertions(+) > create mode 100644 include/io/channel-websock.h > create mode 100644 io/channel-websock.c > > diff --git a/include/io/channel-websock.h b/include/io/channel-websock.h > new file mode 100644 > index 0000000..8e69d86 > --- /dev/null > +++ b/include/io/channel-websock.h > @@ -0,0 +1,108 @@ > +/* > + * QEMU I/O channels driver websockets > + * > + * Copyright (c) 2015 Red Hat, Inc. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, see > <http://www.gnu.org/licenses/>. > + * > + */ > + > +#ifndef QIO_CHANNEL_WEBSOCK_H__ > +#define QIO_CHANNEL_WEBSOCK_H__ > + > +#include "io/channel.h" > +#include "io/buffer.h" > +#include "io/task.h" > + > +#define TYPE_QIO_CHANNEL_WEBSOCK "qio-channel-websock" > +#define QIO_CHANNEL_WEBSOCK(obj) \ > + OBJECT_CHECK(QIOChannelWebsock, (obj), TYPE_QIO_CHANNEL_WEBSOCK) > + > +typedef struct QIOChannelWebsock QIOChannelWebsock; > +typedef union QIOChannelWebsockMask QIOChannelWebsockMask; > + > +union QIOChannelWebsockMask { > + char c[4]; > + uint32_t u; > +}; > + > +/** > + * QIOChannelWebsock > + * > + * The QIOChannelWebsock class provides a channel wrapper which > + * can transparently run the HTTP websockets protocol. This is > + * usually used over a TCP socket, but there is actually no > + * technical restriction on which type of master channel is > + * used as the transport. > + * > + * This channel object is currently only capable of running as > + * a websocket server and is a pretty crude implementation > + * of it, not supporting the full websockets protocol feature > + * set. It is sufficient to use with a simple websockets > + * client for encapsulating VNC for noVNC in-browser client. > + */ > + > +struct QIOChannelWebsock { > + QIOChannel parent; > + QIOChannel *master; > + QIOBuffer encinput; > + QIOBuffer encoutput; > + QIOBuffer rawinput; > + QIOBuffer rawoutput; > + size_t payload_remain; > + QIOChannelWebsockMask mask; > + guint io_tag; > + Error *io_err; > + gboolean io_eof; > +}; > + > +/** > + * qio_channel_websock_new_server: > + * @master: the underlying channel object > + * > + * Create a new websockets channel that runs the server > + * side of the protocol. > + * > + * After creating the channel, it is mandatory to call > + * the qio_channel_websock_handshake() method before attempting > + * todo any I/O on the channel. > + * > + * Once the handshake has completed, all I/O should be done > + * via the new websocket channel object and not the original > + * master channel > + * > + * Returns: the new websockets channel object > + */ > +QIOChannelWebsock * > +qio_channel_websock_new_server(QIOChannel *master); > + > +/** > + * qio_channel_websock_handshake: > + * @ioc: the websocket channel object > + * @func: the callback to invoke when completed > + * @opaque: opaque data to pass to @func > + * @destroy: optional callback to free @opaque > + * > + * Perform the websocket handshake. This method > + * will return immediately and the handshake will > + * continue in the background, provided the main > + * loop is running. When the handshake is complete, > + * or fails, the @func callback will be invoked. > + */ > +void qio_channel_websock_handshake(QIOChannelWebsock *ioc, > + QIOTaskFunc func, > + gpointer opaque, > + GDestroyNotify destroy); > + > +#endif /* QIO_CHANNEL_WEBSOCK_H__ */ > diff --git a/io/Makefile.objs b/io/Makefile.objs > index 2b33d3c..9f93087 100644 > --- a/io/Makefile.objs > +++ b/io/Makefile.objs > @@ -5,3 +5,4 @@ io-obj-y += channel-watch.o > io-obj-y += channel-socket.o > io-obj-y += channel-file.o > io-obj-y += channel-tls.o > +io-obj-y += channel-websock.o > diff --git a/io/channel-websock.c b/io/channel-websock.c > new file mode 100644 > index 0000000..0345b90 > --- /dev/null > +++ b/io/channel-websock.c > @@ -0,0 +1,965 @@ > +/* > + * QEMU I/O channels driver websockets > + * > + * Copyright (c) 2015 Red Hat, Inc. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, see > <http://www.gnu.org/licenses/>. > + * > + */ > + > +#include "io/channel-websock.h" > +#include "crypto/hash.h" > + > +#include <glib/gi18n.h> > + > +/* #define DEBUG_IOC */ > + > +#ifdef DEBUG_IOC > +#define DPRINTF(fmt, ...) \ > + do { fprintf(stderr, "channel-websock: " fmt, ## __VA_ARGS__); } while > (0) > +#else > +#define DPRINTF(fmt, ...) \ > + do { } while (0) > +#endif > + > +/* Max amount to allow in rawinput/rawoutput buffers */ > +#define QIO_CHANNEL_WEBSOCK_MAX_BUFFER 8192 > + > +#define B64LEN(__x) (((__x + 2) / 3) * 12 / 3)
Where is that magic calculation used? > +#define SHA1_DIGEST_LEN 20 > + > +#define QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN 24 > +#define QIO_CHANNEL_WEBSOCK_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" > +#define QIO_CHANNEL_WEBSOCK_GUID_LEN strlen(QIO_CHANNEL_WEBSOCK_GUID) > + > +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RESPONSE \ > + "HTTP/1.1 101 Switching Protocols\r\n" \ > + "Upgrade: websocket\r\n" \ > + "Connection: Upgrade\r\n" \ > + "Sec-WebSocket-Accept: %s\r\n" \ > + "Sec-WebSocket-Protocol: binary\r\n" \ > + "\r\n" > +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM "\r\n" > +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_END "\r\n\r\n" > +#define QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION "13" > + > +#define QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN sizeof(uint16_t) > +#define QIO_CHANNEL_WEBSOCK_HEAD_MAX_LEN \ > + (QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN + sizeof(uint64_t) + sizeof(uint32_t)) > + > +typedef struct QIOChannelWebsockHeader QIOChannelWebsockHeader; > + > +struct QEMU_PACKED QIOChannelWebsockHeader { > + unsigned char b0; > + unsigned char b1; > + union { > + struct QEMU_PACKED { > + uint16_t l16; > + QIOChannelWebsockMask m16; > + } s16; > + struct QEMU_PACKED { > + uint64_t l64; > + QIOChannelWebsockMask m64; > + } s64; > + QIOChannelWebsockMask m; > + } u; > +}; > + > +enum { > + QIO_CHANNEL_WEBSOCK_OPCODE_CONTINUATION = 0x0, > + QIO_CHANNEL_WEBSOCK_OPCODE_TEXT_FRAME = 0x1, > + QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME = 0x2, > + QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE = 0x8, > + QIO_CHANNEL_WEBSOCK_OPCODE_PING = 0x9, > + QIO_CHANNEL_WEBSOCK_OPCODE_PONG = 0xA > +}; > + > +static char *qio_channel_websock_handshake_entry(const char *handshake, > + size_t handshake_len, > + const char *name) > +{ > + char *begin, *end, *ret = NULL; > + char *line = g_strdup_printf("%s%s: ", > + QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM, > + name); > + begin = g_strstr_len(handshake, handshake_len, line); > + if (begin != NULL) { > + begin += strlen(line); > + end = g_strstr_len(begin, handshake_len - (begin - handshake), > + QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM); > + if (end != NULL) { > + ret = g_strndup(begin, end - begin); > + } > + } > + g_free(line); > + return ret; > +} > + > + > +static int qio_channel_websock_handshake_send_response(QIOChannelWebsock > *ioc, > + const char *key, > + Error **errp) > +{ > + char combined_key[QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + > + QIO_CHANNEL_WEBSOCK_GUID_LEN + 1]; > + char *accept = NULL, *response = NULL; > + size_t responselen; > + > + g_strlcpy(combined_key, key, QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + 1); > + g_strlcat(combined_key, QIO_CHANNEL_WEBSOCK_GUID, > + QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + > + QIO_CHANNEL_WEBSOCK_GUID_LEN + 1); > + > + /* hash and encode it */ > + if (qcrypto_hash_base64(QCRYPTO_HASH_ALG_SHA1, > + combined_key, > + QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + > + QIO_CHANNEL_WEBSOCK_GUID_LEN, > + &accept, > + errp) < 0) { > + return -1; > + } > + > + response = g_strdup_printf(QIO_CHANNEL_WEBSOCK_HANDSHAKE_RESPONSE, > accept); > + responselen = strlen(response); > + qio_buffer_reserve(&ioc->encoutput, responselen); > + qio_buffer_append(&ioc->encoutput, response, responselen); > + > + g_free(accept); > + g_free(response); > + > + return 0; > +} > + > +static int qio_channel_websock_handshake_process(QIOChannelWebsock *ioc, > + const char *line, > + size_t size, > + Error **errp) > +{ > + int ret = -1; > + char *protocols = qio_channel_websock_handshake_entry(line, size, > + "Sec-WebSocket-Protocol"); > + char *version = qio_channel_websock_handshake_entry(line, size, > + "Sec-WebSocket-Version"); > + char *key = qio_channel_websock_handshake_entry(line, size, > + "Sec-WebSocket-Key"); > + > + if (!protocols) { > + error_setg(errp, "%s", _("Missing websocket protocol header data")); > + goto cleanup; > + } > + > + if (!version) { > + error_setg(errp, "%s", _("Missing websocket version header data")); > + goto cleanup; > + } > + > + if (!key) { > + error_setg(errp, "%s", _("Missing websocket key header data")); > + goto cleanup; > + } > + > + if (!g_strrstr(protocols, "binary")) { > + error_setg(errp, _("No 'binary' protocol is supported by client > '%s'"), > + protocols); > + goto cleanup; > + } > + > + if (!g_str_equal(version, QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION)) { > + error_setg(errp, _("Version '%s' is not supported by client '%s'"), > + QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION, version); > + goto cleanup; > + } > + > + if (strlen(key) != QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN) { > + error_setg(errp, _("Key length '%zu' was not as expected '%d'"), > + strlen(key), QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN); > + goto cleanup; > + } > + > + ret = qio_channel_websock_handshake_send_response(ioc, key, errp); > + > + cleanup: > + g_free(protocols); > + g_free(version); > + g_free(key); > + return ret; > +} > + > +static int qio_channel_websock_handshake_read(QIOChannelWebsock *ioc, > + Error **errp) > +{ > + char *handshake_end; > + ssize_t ret; > + /* Typical HTTP headers from novnc are 512 bytes, so limiting > + * total header size to 4096 is easily enough. */ > + size_t want = 4096 - ioc->encinput.offset; > + qio_buffer_reserve(&ioc->encinput, want); > + ret = qio_channel_read(ioc->master, > + (char *)qio_buffer_end(&ioc->encinput), want, > errp); > + if (ret < 0) { > + return -1; > + } > + ioc->encinput.offset += ret; > + > + handshake_end = g_strstr_len((char *)ioc->encinput.buffer, > + ioc->encinput.offset, > + QIO_CHANNEL_WEBSOCK_HANDSHAKE_END); > + if (!handshake_end) { > + if (ioc->encinput.offset >= 4096) { > + error_setg(errp, "%s", > + _("End of headers not found in first 4096 bytes")); > + return -1; > + } else { > + return 0; > + } > + } > + > + if (qio_channel_websock_handshake_process(ioc, > + (char *)ioc->encinput.buffer, > + ioc->encinput.offset, > + errp) < 0) { > + return -1; > + } > + > + qio_buffer_advance(&ioc->encinput, > + handshake_end - (char *)ioc->encinput.buffer + > + strlen(QIO_CHANNEL_WEBSOCK_HANDSHAKE_END)); > + return 1; Can you comment the return values for this function; I guess -1 is error, 1 is good, what's 0 ? > +static gboolean qio_channel_websock_handshake_send(QIOChannel *ioc, > + GIOCondition condition, > + gpointer user_data) > +{ > + QIOTask *task = user_data; > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK( > + qio_task_get_source(task)); > + Error *err = NULL; > + ssize_t ret; > + > + DPRINTF("Sending websock handshake reply %p\n", wioc); > + ret = qio_channel_write(wioc->master, > + (char *)wioc->encoutput.buffer, > + wioc->encoutput.offset, > + &err); > + > + if (ret < 0) { > + qio_task_abort(task, err); > + DPRINTF("Error sending websock handshake reply %p: %s\n", > + wioc, error_get_pretty(err)); > + error_free(err); > + return FALSE; > + } > + > + qio_buffer_advance(&wioc->encoutput, ret); > + if (wioc->encoutput.offset == 0) { > + DPRINTF("Finished sending websock handshake %p\n", > + wioc); > + qio_task_complete(task); > + return FALSE; > + } > + return TRUE; > +} > + > +static gboolean qio_channel_websock_handshake_io(QIOChannel *ioc, > + GIOCondition condition, > + gpointer user_data) > +{ > + QIOTask *task = user_data; > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK( > + qio_task_get_source(task)); > + Error *err = NULL; > + int ret; > + > + DPRINTF("Reading websock handshake request %p\n", wioc); > + ret = qio_channel_websock_handshake_read(wioc, &err); > + if (ret < 0) { > + DPRINTF("Error reading websock handshake %s\n", > + error_get_pretty(err)); > + qio_task_abort(task, err); > + error_free(err); > + return FALSE; > + } > + if (ret == 0) { > + DPRINTF("Blocking on more request data\n"); > + /* need more data still */ > + return TRUE; > + } > + > + DPRINTF("Websock request complete, adding watch for reply %p\n", > + wioc); > + > + object_ref(OBJECT(task)); > + qio_channel_add_watch( > + wioc->master, > + G_IO_OUT, > + qio_channel_websock_handshake_send, > + task, > + (GDestroyNotify)object_unref); > + return FALSE; > +} > + > + > +static void qio_channel_websock_encode(QIOChannelWebsock *ioc) > +{ > + size_t header_size = 0; > + unsigned char opcode = QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME; > + union { > + char buf[QIO_CHANNEL_WEBSOCK_HEAD_MAX_LEN]; > + QIOChannelWebsockHeader ws; > + } header; > + > + DPRINTF("Encoding pending raw output %zu %p\n", > + ioc->rawoutput.offset, ioc); > + if (!ioc->rawoutput.offset) { > + return; > + } > + > + header.ws.b0 = 0x80 | (opcode & 0x0f); There are quite a few magic header sizes 125, (and I think I saw some other sizes below) - some comments on them, or constants? > + if (ioc->rawoutput.offset <= 125) { Dave > + header.ws.b1 = (uint8_t)ioc->rawoutput.offset; > + header_size = 2; > + } else if (ioc->rawoutput.offset < 65536) { > + header.ws.b1 = 0x7e; > + header.ws.u.s16.l16 = cpu_to_be16((uint16_t)ioc->rawoutput.offset); > + header_size = 4; > + } else { > + header.ws.b1 = 0x7f; > + header.ws.u.s64.l64 = cpu_to_be64(ioc->rawoutput.offset); > + header_size = 10; > + } > + > + qio_buffer_reserve(&ioc->encoutput, header_size + ioc->rawoutput.offset); > + qio_buffer_append(&ioc->encoutput, header.buf, header_size); > + qio_buffer_append(&ioc->encoutput, ioc->rawoutput.buffer, > + ioc->rawoutput.offset); > + qio_buffer_reset(&ioc->rawoutput); > + DPRINTF("Have %zu bytes encoded output %p\n", > + ioc->encoutput.offset, ioc); > +} > + > + > +static ssize_t qio_channel_websock_decode_header(QIOChannelWebsock *ioc, > + Error **errp) > +{ > + unsigned char opcode = 0, fin = 0, has_mask = 0; > + size_t header_size = 0; > + size_t payload_len; > + QIOChannelWebsockHeader *header = > + (QIOChannelWebsockHeader *)ioc->encinput.buffer; > + > + if (ioc->payload_remain) { > + error_setg(errp, > + _("Decoding header but %zu bytes of payload remain"), > + ioc->payload_remain); > + return -1; > + } > + if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN + 4) { > + /* header not complete */ > + return QIO_CHANNEL_ERR_BLOCK; > + } > + > + fin = (header->b0 & 0x80) >> 7; > + opcode = header->b0 & 0x0f; > + has_mask = (header->b1 & 0x80) >> 7; > + payload_len = header->b1 & 0x7f; > + > + if (opcode == QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE) { > + /* disconnect */ > + return 0; > + } > + > + /* Websocket frame sanity check: > + * * Websocket fragmentation is not supported. > + * * All websockets frames sent by a client have to be masked. > + * * Only binary encoding is supported. > + */ > + if (!fin) { > + error_setg(errp, "%s", _("websocket fragmentation is not > supported")); > + return -1; > + } > + if (!has_mask) { > + error_setg(errp, "%s", _("websocket frames must be masked")); > + return -1; > + } > + if (opcode != QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME) { > + error_setg(errp, "%s", _("only binary websocket frames are > supported")); > + return -1; > + } > + > + if (payload_len < 126) { > + ioc->payload_remain = payload_len; > + header_size = 6; > + ioc->mask = header->u.m; > + } else if (payload_len == 126 && ioc->encinput.offset >= 8) { > + ioc->payload_remain = be16_to_cpu(header->u.s16.l16); > + header_size = 8; > + ioc->mask = header->u.s16.m16; > + } else if (payload_len == 127 && ioc->encinput.offset >= 14) { > + ioc->payload_remain = be64_to_cpu(header->u.s64.l64); > + header_size = 14; > + ioc->mask = header->u.s64.m64; > + } else { > + /* header not complete */ > + return QIO_CHANNEL_ERR_BLOCK; > + } > + > + qio_buffer_advance(&ioc->encinput, header_size); > + return 1; > +} > + > + > +static ssize_t qio_channel_websock_decode_payload(QIOChannelWebsock *ioc, > + Error **errp) > +{ > + size_t i; > + size_t payload_len; > + uint32_t *payload32; > + > + if (!ioc->payload_remain) { > + error_setg(errp, "%s", > + _("Decoding payload but no bytes of payload remain")); > + return -1; > + } > + > + /* If we aren't at the end of the payload, then drop > + * off the last bytes, so we're always multiple of 4 > + * for purpose of unmasking, except at end of payload > + */ > + if (ioc->encinput.offset < ioc->payload_remain) { > + payload_len = ioc->encinput.offset - (ioc->encinput.offset % 4); > + } else { > + payload_len = ioc->payload_remain; > + } > + if (payload_len == 0) { > + return QIO_CHANNEL_ERR_BLOCK; > + } > + > + ioc->payload_remain -= payload_len; > + > + /* unmask frame */ > + /* process 1 frame (32 bit op) */ > + payload32 = (uint32_t *)ioc->encinput.buffer; > + for (i = 0; i < payload_len / 4; i++) { > + payload32[i] ^= ioc->mask.u; > + } > + /* process the remaining bytes (if any) */ > + for (i *= 4; i < payload_len; i++) { > + ioc->encinput.buffer[i] ^= ioc->mask.c[i % 4]; > + } > + > + qio_buffer_reserve(&ioc->rawinput, payload_len); > + qio_buffer_append(&ioc->rawinput, ioc->encinput.buffer, payload_len); > + qio_buffer_advance(&ioc->encinput, payload_len); > + return payload_len; > +} > + > + > +QIOChannelWebsock * > +qio_channel_websock_new_server(QIOChannel *master) > +{ > + QIOChannelWebsock *wioc; > + > + wioc = QIO_CHANNEL_WEBSOCK(object_new(TYPE_QIO_CHANNEL_WEBSOCK)); > + > + wioc->master = master; > + object_ref(OBJECT(master)); > + > + return wioc; > +} > + > +void qio_channel_websock_handshake(QIOChannelWebsock *ioc, > + QIOTaskFunc func, > + gpointer opaque, > + GDestroyNotify destroy) > +{ > + QIOTask *task; > + > + task = qio_task_new(OBJECT(ioc), > + func, > + opaque, > + destroy); > + > + DPRINTF("Adding watch on master %p for websocket %p handshake\n", > + ioc->master, ioc); > + qio_channel_add_watch(ioc->master, > + G_IO_IN, > + qio_channel_websock_handshake_io, > + task, > + NULL); > +} > + > +static void qio_channel_websock_init(Object *obj G_GNUC_UNUSED) > +{ > +} > + > + > +static void qio_channel_websock_finalize(Object *obj) > +{ > + QIOChannelWebsock *ioc = QIO_CHANNEL_WEBSOCK(obj); > + > + qio_buffer_free(&ioc->encinput); > + qio_buffer_free(&ioc->encoutput); > + qio_buffer_free(&ioc->rawinput); > + qio_buffer_free(&ioc->rawoutput); > + object_unref(OBJECT(ioc->master)); > + if (ioc->io_tag) { > + g_source_remove(ioc->io_tag); > + } > + if (ioc->io_err) { > + error_free(ioc->io_err); > + } > +} > + > + > +static ssize_t qio_channel_websock_read_wire(QIOChannelWebsock *ioc, > + Error **errp) > +{ > + ssize_t ret; > + > + DPRINTF("Have %zu bytes %p\n", ioc->encoutput.offset, ioc); > + if (ioc->encinput.offset < 4096) { > + size_t want = 4096 - ioc->encinput.offset; > + > + qio_buffer_reserve(&ioc->encinput, want); > + ret = qio_channel_read(ioc->master, > + (char *)ioc->encinput.buffer + > + ioc->encinput.offset, > + want, > + errp); > + if (ret < 0) { > + return ret; > + } > + if (ret == 0 && > + ioc->encinput.offset == 0) { > + DPRINTF("EOF on wire & no more enc data availabl\n"); > + return 0; > + } > + ioc->encinput.offset += ret; > + DPRINTF("Now have %zu bytes enc input\n", ioc->encinput.offset); > + } > + > + if (ioc->payload_remain == 0) { > + DPRINTF("Looking to decode header\n"); > + ret = qio_channel_websock_decode_header(ioc, errp); > + if (ret < 0) { > + return ret; > + } > + if (ret == 0) { > + DPRINTF("EOF when decoding header\n"); > + return 0; > + } > + } > + DPRINTF("Looking to decode payload %zu\n", ioc->payload_remain); > + > + ret = qio_channel_websock_decode_payload(ioc, errp); > + if (ret < 0) { > + return ret; > + } > + DPRINTF("Now have %zu bytes raw input\n", ioc->rawinput.offset); > + return ret; > +} > + > + > +static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *ioc, > + Error **errp) > +{ > + ssize_t ret; > + ssize_t done = 0; > + qio_channel_websock_encode(ioc); > + > + DPRINTF("Writing %zu bytes %p\n", ioc->encoutput.offset, ioc); > + while (ioc->encoutput.offset > 0) { > + ret = qio_channel_write(ioc->master, > + (char *)ioc->encoutput.buffer, > + ioc->encoutput.offset, > + errp); > + if (ret < 0) { > + if (ret == QIO_CHANNEL_ERR_BLOCK && > + done > 0) { > + DPRINTF("Blocking but wrote %zu\n", done); > + return done; > + } else { > + DPRINTF("Error while writing %s\n", > + error_get_pretty(*errp)); > + return ret; > + } > + } > + qio_buffer_advance(&ioc->encoutput, ret); > + done += ret; > + } > + DPRINTF("Wrote %zu total\n", done); > + return done; > +} > + > + > +static void qio_channel_websock_flush_free(gpointer user_data) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data); > + object_unref(OBJECT(wioc)); > +} > + > +static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc); > + > +static gboolean qio_channel_websock_flush(QIOChannel *ioc, > + GIOCondition condition, > + gpointer user_data) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data); > + ssize_t ret; > + > + DPRINTF("Websock master flush %p %d\n", ioc, condition); > + if (condition & G_IO_OUT) { > + ret = qio_channel_websock_write_wire(wioc, &wioc->io_err); > + if (ret < 0) { > + goto cleanup; > + } > + } > + > + if (condition & G_IO_IN) { > + ret = qio_channel_websock_read_wire(wioc, &wioc->io_err); > + if (ret < 0) { > + goto cleanup; > + } > + if (ret == 0) { > + DPRINTF("Got EOF when reading %p\n", wioc); > + wioc->io_eof = TRUE; > + } > + } > + > + cleanup: > + qio_channel_websock_set_watch(wioc); > + return FALSE; > +} > + > + > +static void qio_channel_websock_unset_watch(QIOChannelWebsock *ioc) > +{ > + if (ioc->io_tag) { > + DPRINTF("Removing old master watch %u %p\n", ioc->io_tag, ioc); > + g_source_remove(ioc->io_tag); > + ioc->io_tag = 0; > + } > +} > + > +static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc) > +{ > + GIOCondition cond = 0; > + > + qio_channel_websock_unset_watch(ioc); > + > + if (ioc->io_err) { > + DPRINTF("Not adding master watch due to error %p %s\n", > + ioc, error_get_pretty(ioc->io_err)); > + return; > + } > + > + if (ioc->encoutput.offset) { > + cond |= G_IO_OUT; > + } > + if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER && > + !ioc->io_eof) { > + cond |= G_IO_IN; > + } > + > + DPRINTF("Cond %d output=%zu input=%zu eof=%d\n", > + cond, ioc->encoutput.offset, ioc->encinput.offset, ioc->io_eof); > + if (cond) { > + object_ref(OBJECT(ioc)); > + ioc->io_tag = > + qio_channel_add_watch(ioc->master, > + cond, > + qio_channel_websock_flush, > + ioc, > + qio_channel_websock_flush_free); > + } > +} > + > + > +static ssize_t qio_channel_websock_readv(QIOChannel *ioc, > + const struct iovec *iov, > + size_t niov, > + int **fds, > + size_t *nfds, > + Error **errp) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); > + size_t i; > + ssize_t got = 0; > + ssize_t ret; > + DPRINTF("Read ioc %p %zu %p\n", iov, niov, ioc); > + if (fds || nfds) { > + error_setg(errp, "%s", > + _("Cannot receive file descriptors over websocket > channel")); > + return -1; > + } > + > + if (wioc->io_err) { > + *errp = error_copy(wioc->io_err); > + return -1; > + } > + > + if (!wioc->rawinput.offset) { > + ret = qio_channel_websock_read_wire(QIO_CHANNEL_WEBSOCK(ioc), errp); > + if (ret < 0) { > + return ret; > + } > + } > + > + for (i = 0 ; i < niov ; i++) { > + size_t want = iov[i].iov_len; > + if (want > (wioc->rawinput.offset - got)) { > + want = (wioc->rawinput.offset - got); > + } > + > + memcpy(iov[i].iov_base, > + wioc->rawinput.buffer + got, > + want); > + got += want; > + > + if (want < iov[i].iov_len) { > + break; > + } > + } > + > + qio_buffer_advance(&wioc->rawinput, got); > + qio_channel_websock_set_watch(wioc); > + DPRINTF("Returning %zu\n", got); > + return got; > +} > + > + > +static ssize_t qio_channel_websock_writev(QIOChannel *ioc, > + const struct iovec *iov, > + size_t niov, > + int *fds, > + size_t nfds, > + Error **errp) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); > + size_t i; > + ssize_t done = 0; > + ssize_t ret; > + > + if (fds || nfds) { > + error_setg(errp, "%s", > + _("Cannot send file descriptors over websocket channel")); > + return -1; > + } > + > + DPRINTF("Writev %p %zu %p err=%p\n", iov, niov, ioc, wioc->io_err); > + if (wioc->io_err) { > + *errp = error_copy(wioc->io_err); > + return -1; > + } > + > + if (wioc->io_eof) { > + error_setg(errp, "%s", "Broken pipe"); > + return -1; > + } > + > + for (i = 0; i < niov; i++) { > + size_t want = iov[i].iov_len; > + if ((want + wioc->rawoutput.offset) > > QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { > + want = (QIO_CHANNEL_WEBSOCK_MAX_BUFFER - wioc->rawoutput.offset); > + } > + if (want == 0) { > + goto done; > + } > + > + qio_buffer_reserve(&wioc->rawoutput, want); > + qio_buffer_append(&wioc->rawoutput, iov[i].iov_base, want); > + done += want; > + if (want < iov[i].iov_len) { > + break; > + } > + } > + > + done: > + ret = qio_channel_websock_write_wire(wioc, errp); > + if (ret < 0 && > + ret != QIO_CHANNEL_ERR_BLOCK) { > + qio_channel_websock_unset_watch(wioc); > + return -1; > + } > + > + qio_channel_websock_set_watch(wioc); > + > + if (done == 0) { > + return QIO_CHANNEL_ERR_BLOCK; > + } > + > + return done; > +} > + > +static void qio_channel_websock_set_blocking(QIOChannel *ioc, > + bool enabled) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); > + > + qio_channel_set_blocking(wioc->master, enabled); > +} > + > +static int qio_channel_websock_close(QIOChannel *ioc, > + Error **errp) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); > + > + return qio_channel_close(wioc->master, errp); > +} > + > +typedef struct QIOChannelWebsockSource QIOChannelWebsockSource; > +struct QIOChannelWebsockSource { > + GSource parent; > + QIOChannelWebsock *wioc; > + GIOCondition condition; > +}; > + > +static gboolean > +qio_channel_websock_source_prepare(GSource *source, > + gint *timeout) > +{ > + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; > + GIOCondition cond = 0; > + *timeout = -1; > + > + if (wsource->wioc->rawinput.offset) { > + cond |= G_IO_IN; > + } > + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { > + cond |= G_IO_OUT; > + } > + > +#if 0 > + DPRINTF("Prep source %d cond %d input=%zu output=%zu\n", > + wsource->condition, cond, > + wsource->wioc->rawinput.offset, > + wsource->wioc->rawoutput.offset); > +#endif > + > + return cond & wsource->condition; > +} > + > +static gboolean > +qio_channel_websock_source_check(GSource *source) > +{ > + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; > + GIOCondition cond = 0; > + > + if (wsource->wioc->rawinput.offset) { > + cond |= G_IO_IN; > + } > + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { > + cond |= G_IO_OUT; > + } > + > + if (cond & wsource->condition) { > + DPRINTF("Check source %d cond %d input=%zu output=%zu\n", > + wsource->condition, cond, > + wsource->wioc->rawinput.offset, > + wsource->wioc->rawoutput.offset); > + } > + return cond & wsource->condition; > +} > + > +static gboolean > +qio_channel_websock_source_dispatch(GSource *source, > + GSourceFunc callback, > + gpointer user_data) > +{ > + QIOChannelFunc func = (QIOChannelFunc)callback; > + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; > + GIOCondition cond = 0; > + > + if (wsource->wioc->rawinput.offset) { > + cond |= G_IO_IN; > + } > + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { > + cond |= G_IO_OUT; > + } > + > + DPRINTF("Disp source %d cond %d input=%zu output=%zu\n", > + wsource->condition, cond, > + wsource->wioc->rawinput.offset, > + wsource->wioc->rawoutput.offset); > + return (*func)(QIO_CHANNEL(wsource->wioc), > + (cond & wsource->condition), > + user_data); > +} > + > +static void > +qio_channel_websock_source_finalize(GSource *source) > +{ > + QIOChannelWebsockSource *ssource = (QIOChannelWebsockSource *)source; > + > + object_unref(OBJECT(ssource->wioc)); > +} > + > +GSourceFuncs qio_channel_websock_source_funcs = { > + qio_channel_websock_source_prepare, > + qio_channel_websock_source_check, > + qio_channel_websock_source_dispatch, > + qio_channel_websock_source_finalize > +}; > + > +static GSource *qio_channel_websock_create_watch(QIOChannel *ioc, > + GIOCondition condition) > +{ > + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); > + QIOChannelWebsockSource *ssource; > + GSource *source; > + > + DPRINTF("Creating websock watch %p cond=%d\n", wioc, condition); > + source = g_source_new(&qio_channel_websock_source_funcs, > + sizeof(QIOChannelWebsockSource)); > + g_source_set_name(source, "QIOChannelWebsock"); > + ssource = (QIOChannelWebsockSource *)source; > + > + ssource->wioc = wioc; > + object_ref(OBJECT(wioc)); > + > + ssource->condition = condition; > + > + qio_channel_websock_set_watch(wioc); > + return source; > +} > + > +static void qio_channel_websock_class_init(ObjectClass *klass, > + void *class_data G_GNUC_UNUSED) > +{ > + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); > + > + ioc_klass->io_writev = qio_channel_websock_writev; > + ioc_klass->io_readv = qio_channel_websock_readv; > + ioc_klass->io_set_blocking = qio_channel_websock_set_blocking; > + ioc_klass->io_close = qio_channel_websock_close; > + ioc_klass->io_create_watch = qio_channel_websock_create_watch; > +} > + > +static const TypeInfo qio_channel_websock_info = { > + .parent = TYPE_QIO_CHANNEL, > + .name = TYPE_QIO_CHANNEL_WEBSOCK, > + .instance_size = sizeof(QIOChannelWebsock), > + .instance_init = qio_channel_websock_init, > + .instance_finalize = qio_channel_websock_finalize, > + .class_init = qio_channel_websock_class_init, > +}; > + > +static void qio_channel_websock_register_types(void) > +{ > + type_register_static(&qio_channel_websock_info); > +} > + > +type_init(qio_channel_websock_register_types); > -- > 2.4.3 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK