Add a QIOChannel subclass that can run the websocket protocol over the top of another QIOChannel instance. This initial implementation is only capable of acting as a websockets server. There is no support for acting as a websockets client yet.
Signed-off-by: Daniel P. Berrange <berra...@redhat.com> --- include/io/channel-websock.h | 108 +++++ io/Makefile.objs | 1 + io/channel-websock.c | 965 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1074 insertions(+) create mode 100644 include/io/channel-websock.h create mode 100644 io/channel-websock.c diff --git a/include/io/channel-websock.h b/include/io/channel-websock.h new file mode 100644 index 0000000..8e69d86 --- /dev/null +++ b/include/io/channel-websock.h @@ -0,0 +1,108 @@ +/* + * QEMU I/O channels driver websockets + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see <http://www.gnu.org/licenses/>. + * + */ + +#ifndef QIO_CHANNEL_WEBSOCK_H__ +#define QIO_CHANNEL_WEBSOCK_H__ + +#include "io/channel.h" +#include "io/buffer.h" +#include "io/task.h" + +#define TYPE_QIO_CHANNEL_WEBSOCK "qio-channel-websock" +#define QIO_CHANNEL_WEBSOCK(obj) \ + OBJECT_CHECK(QIOChannelWebsock, (obj), TYPE_QIO_CHANNEL_WEBSOCK) + +typedef struct QIOChannelWebsock QIOChannelWebsock; +typedef union QIOChannelWebsockMask QIOChannelWebsockMask; + +union QIOChannelWebsockMask { + char c[4]; + uint32_t u; +}; + +/** + * QIOChannelWebsock + * + * The QIOChannelWebsock class provides a channel wrapper which + * can transparently run the HTTP websockets protocol. This is + * usually used over a TCP socket, but there is actually no + * technical restriction on which type of master channel is + * used as the transport. + * + * This channel object is currently only capable of running as + * a websocket server and is a pretty crude implementation + * of it, not supporting the full websockets protocol feature + * set. It is sufficient to use with a simple websockets + * client for encapsulating VNC for noVNC in-browser client. + */ + +struct QIOChannelWebsock { + QIOChannel parent; + QIOChannel *master; + QIOBuffer encinput; + QIOBuffer encoutput; + QIOBuffer rawinput; + QIOBuffer rawoutput; + size_t payload_remain; + QIOChannelWebsockMask mask; + guint io_tag; + Error *io_err; + gboolean io_eof; +}; + +/** + * qio_channel_websock_new_server: + * @master: the underlying channel object + * + * Create a new websockets channel that runs the server + * side of the protocol. + * + * After creating the channel, it is mandatory to call + * the qio_channel_websock_handshake() method before attempting + * todo any I/O on the channel. + * + * Once the handshake has completed, all I/O should be done + * via the new websocket channel object and not the original + * master channel + * + * Returns: the new websockets channel object + */ +QIOChannelWebsock * +qio_channel_websock_new_server(QIOChannel *master); + +/** + * qio_channel_websock_handshake: + * @ioc: the websocket channel object + * @func: the callback to invoke when completed + * @opaque: opaque data to pass to @func + * @destroy: optional callback to free @opaque + * + * Perform the websocket handshake. This method + * will return immediately and the handshake will + * continue in the background, provided the main + * loop is running. When the handshake is complete, + * or fails, the @func callback will be invoked. + */ +void qio_channel_websock_handshake(QIOChannelWebsock *ioc, + QIOTaskFunc func, + gpointer opaque, + GDestroyNotify destroy); + +#endif /* QIO_CHANNEL_WEBSOCK_H__ */ diff --git a/io/Makefile.objs b/io/Makefile.objs index 2b33d3c..9f93087 100644 --- a/io/Makefile.objs +++ b/io/Makefile.objs @@ -5,3 +5,4 @@ io-obj-y += channel-watch.o io-obj-y += channel-socket.o io-obj-y += channel-file.o io-obj-y += channel-tls.o +io-obj-y += channel-websock.o diff --git a/io/channel-websock.c b/io/channel-websock.c new file mode 100644 index 0000000..0345b90 --- /dev/null +++ b/io/channel-websock.c @@ -0,0 +1,965 @@ +/* + * QEMU I/O channels driver websockets + * + * Copyright (c) 2015 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see <http://www.gnu.org/licenses/>. + * + */ + +#include "io/channel-websock.h" +#include "crypto/hash.h" + +#include <glib/gi18n.h> + +/* #define DEBUG_IOC */ + +#ifdef DEBUG_IOC +#define DPRINTF(fmt, ...) \ + do { fprintf(stderr, "channel-websock: " fmt, ## __VA_ARGS__); } while (0) +#else +#define DPRINTF(fmt, ...) \ + do { } while (0) +#endif + +/* Max amount to allow in rawinput/rawoutput buffers */ +#define QIO_CHANNEL_WEBSOCK_MAX_BUFFER 8192 + +#define B64LEN(__x) (((__x + 2) / 3) * 12 / 3) +#define SHA1_DIGEST_LEN 20 + +#define QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN 24 +#define QIO_CHANNEL_WEBSOCK_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" +#define QIO_CHANNEL_WEBSOCK_GUID_LEN strlen(QIO_CHANNEL_WEBSOCK_GUID) + +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_RESPONSE \ + "HTTP/1.1 101 Switching Protocols\r\n" \ + "Upgrade: websocket\r\n" \ + "Connection: Upgrade\r\n" \ + "Sec-WebSocket-Accept: %s\r\n" \ + "Sec-WebSocket-Protocol: binary\r\n" \ + "\r\n" +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM "\r\n" +#define QIO_CHANNEL_WEBSOCK_HANDSHAKE_END "\r\n\r\n" +#define QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION "13" + +#define QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN sizeof(uint16_t) +#define QIO_CHANNEL_WEBSOCK_HEAD_MAX_LEN \ + (QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN + sizeof(uint64_t) + sizeof(uint32_t)) + +typedef struct QIOChannelWebsockHeader QIOChannelWebsockHeader; + +struct QEMU_PACKED QIOChannelWebsockHeader { + unsigned char b0; + unsigned char b1; + union { + struct QEMU_PACKED { + uint16_t l16; + QIOChannelWebsockMask m16; + } s16; + struct QEMU_PACKED { + uint64_t l64; + QIOChannelWebsockMask m64; + } s64; + QIOChannelWebsockMask m; + } u; +}; + +enum { + QIO_CHANNEL_WEBSOCK_OPCODE_CONTINUATION = 0x0, + QIO_CHANNEL_WEBSOCK_OPCODE_TEXT_FRAME = 0x1, + QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME = 0x2, + QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE = 0x8, + QIO_CHANNEL_WEBSOCK_OPCODE_PING = 0x9, + QIO_CHANNEL_WEBSOCK_OPCODE_PONG = 0xA +}; + +static char *qio_channel_websock_handshake_entry(const char *handshake, + size_t handshake_len, + const char *name) +{ + char *begin, *end, *ret = NULL; + char *line = g_strdup_printf("%s%s: ", + QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM, + name); + begin = g_strstr_len(handshake, handshake_len, line); + if (begin != NULL) { + begin += strlen(line); + end = g_strstr_len(begin, handshake_len - (begin - handshake), + QIO_CHANNEL_WEBSOCK_HANDSHAKE_DELIM); + if (end != NULL) { + ret = g_strndup(begin, end - begin); + } + } + g_free(line); + return ret; +} + + +static int qio_channel_websock_handshake_send_response(QIOChannelWebsock *ioc, + const char *key, + Error **errp) +{ + char combined_key[QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + + QIO_CHANNEL_WEBSOCK_GUID_LEN + 1]; + char *accept = NULL, *response = NULL; + size_t responselen; + + g_strlcpy(combined_key, key, QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + 1); + g_strlcat(combined_key, QIO_CHANNEL_WEBSOCK_GUID, + QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + + QIO_CHANNEL_WEBSOCK_GUID_LEN + 1); + + /* hash and encode it */ + if (qcrypto_hash_base64(QCRYPTO_HASH_ALG_SHA1, + combined_key, + QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN + + QIO_CHANNEL_WEBSOCK_GUID_LEN, + &accept, + errp) < 0) { + return -1; + } + + response = g_strdup_printf(QIO_CHANNEL_WEBSOCK_HANDSHAKE_RESPONSE, accept); + responselen = strlen(response); + qio_buffer_reserve(&ioc->encoutput, responselen); + qio_buffer_append(&ioc->encoutput, response, responselen); + + g_free(accept); + g_free(response); + + return 0; +} + +static int qio_channel_websock_handshake_process(QIOChannelWebsock *ioc, + const char *line, + size_t size, + Error **errp) +{ + int ret = -1; + char *protocols = qio_channel_websock_handshake_entry(line, size, + "Sec-WebSocket-Protocol"); + char *version = qio_channel_websock_handshake_entry(line, size, + "Sec-WebSocket-Version"); + char *key = qio_channel_websock_handshake_entry(line, size, + "Sec-WebSocket-Key"); + + if (!protocols) { + error_setg(errp, "%s", _("Missing websocket protocol header data")); + goto cleanup; + } + + if (!version) { + error_setg(errp, "%s", _("Missing websocket version header data")); + goto cleanup; + } + + if (!key) { + error_setg(errp, "%s", _("Missing websocket key header data")); + goto cleanup; + } + + if (!g_strrstr(protocols, "binary")) { + error_setg(errp, _("No 'binary' protocol is supported by client '%s'"), + protocols); + goto cleanup; + } + + if (!g_str_equal(version, QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION)) { + error_setg(errp, _("Version '%s' is not supported by client '%s'"), + QIO_CHANNEL_WEBSOCK_SUPPORTED_VERSION, version); + goto cleanup; + } + + if (strlen(key) != QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN) { + error_setg(errp, _("Key length '%zu' was not as expected '%d'"), + strlen(key), QIO_CHANNEL_WEBSOCK_CLIENT_KEY_LEN); + goto cleanup; + } + + ret = qio_channel_websock_handshake_send_response(ioc, key, errp); + + cleanup: + g_free(protocols); + g_free(version); + g_free(key); + return ret; +} + +static int qio_channel_websock_handshake_read(QIOChannelWebsock *ioc, + Error **errp) +{ + char *handshake_end; + ssize_t ret; + /* Typical HTTP headers from novnc are 512 bytes, so limiting + * total header size to 4096 is easily enough. */ + size_t want = 4096 - ioc->encinput.offset; + qio_buffer_reserve(&ioc->encinput, want); + ret = qio_channel_read(ioc->master, + (char *)qio_buffer_end(&ioc->encinput), want, errp); + if (ret < 0) { + return -1; + } + ioc->encinput.offset += ret; + + handshake_end = g_strstr_len((char *)ioc->encinput.buffer, + ioc->encinput.offset, + QIO_CHANNEL_WEBSOCK_HANDSHAKE_END); + if (!handshake_end) { + if (ioc->encinput.offset >= 4096) { + error_setg(errp, "%s", + _("End of headers not found in first 4096 bytes")); + return -1; + } else { + return 0; + } + } + + if (qio_channel_websock_handshake_process(ioc, + (char *)ioc->encinput.buffer, + ioc->encinput.offset, + errp) < 0) { + return -1; + } + + qio_buffer_advance(&ioc->encinput, + handshake_end - (char *)ioc->encinput.buffer + + strlen(QIO_CHANNEL_WEBSOCK_HANDSHAKE_END)); + return 1; +} + +static gboolean qio_channel_websock_handshake_send(QIOChannel *ioc, + GIOCondition condition, + gpointer user_data) +{ + QIOTask *task = user_data; + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK( + qio_task_get_source(task)); + Error *err = NULL; + ssize_t ret; + + DPRINTF("Sending websock handshake reply %p\n", wioc); + ret = qio_channel_write(wioc->master, + (char *)wioc->encoutput.buffer, + wioc->encoutput.offset, + &err); + + if (ret < 0) { + qio_task_abort(task, err); + DPRINTF("Error sending websock handshake reply %p: %s\n", + wioc, error_get_pretty(err)); + error_free(err); + return FALSE; + } + + qio_buffer_advance(&wioc->encoutput, ret); + if (wioc->encoutput.offset == 0) { + DPRINTF("Finished sending websock handshake %p\n", + wioc); + qio_task_complete(task); + return FALSE; + } + return TRUE; +} + +static gboolean qio_channel_websock_handshake_io(QIOChannel *ioc, + GIOCondition condition, + gpointer user_data) +{ + QIOTask *task = user_data; + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK( + qio_task_get_source(task)); + Error *err = NULL; + int ret; + + DPRINTF("Reading websock handshake request %p\n", wioc); + ret = qio_channel_websock_handshake_read(wioc, &err); + if (ret < 0) { + DPRINTF("Error reading websock handshake %s\n", + error_get_pretty(err)); + qio_task_abort(task, err); + error_free(err); + return FALSE; + } + if (ret == 0) { + DPRINTF("Blocking on more request data\n"); + /* need more data still */ + return TRUE; + } + + DPRINTF("Websock request complete, adding watch for reply %p\n", + wioc); + + object_ref(OBJECT(task)); + qio_channel_add_watch( + wioc->master, + G_IO_OUT, + qio_channel_websock_handshake_send, + task, + (GDestroyNotify)object_unref); + return FALSE; +} + + +static void qio_channel_websock_encode(QIOChannelWebsock *ioc) +{ + size_t header_size = 0; + unsigned char opcode = QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME; + union { + char buf[QIO_CHANNEL_WEBSOCK_HEAD_MAX_LEN]; + QIOChannelWebsockHeader ws; + } header; + + DPRINTF("Encoding pending raw output %zu %p\n", + ioc->rawoutput.offset, ioc); + if (!ioc->rawoutput.offset) { + return; + } + + header.ws.b0 = 0x80 | (opcode & 0x0f); + if (ioc->rawoutput.offset <= 125) { + header.ws.b1 = (uint8_t)ioc->rawoutput.offset; + header_size = 2; + } else if (ioc->rawoutput.offset < 65536) { + header.ws.b1 = 0x7e; + header.ws.u.s16.l16 = cpu_to_be16((uint16_t)ioc->rawoutput.offset); + header_size = 4; + } else { + header.ws.b1 = 0x7f; + header.ws.u.s64.l64 = cpu_to_be64(ioc->rawoutput.offset); + header_size = 10; + } + + qio_buffer_reserve(&ioc->encoutput, header_size + ioc->rawoutput.offset); + qio_buffer_append(&ioc->encoutput, header.buf, header_size); + qio_buffer_append(&ioc->encoutput, ioc->rawoutput.buffer, + ioc->rawoutput.offset); + qio_buffer_reset(&ioc->rawoutput); + DPRINTF("Have %zu bytes encoded output %p\n", + ioc->encoutput.offset, ioc); +} + + +static ssize_t qio_channel_websock_decode_header(QIOChannelWebsock *ioc, + Error **errp) +{ + unsigned char opcode = 0, fin = 0, has_mask = 0; + size_t header_size = 0; + size_t payload_len; + QIOChannelWebsockHeader *header = + (QIOChannelWebsockHeader *)ioc->encinput.buffer; + + if (ioc->payload_remain) { + error_setg(errp, + _("Decoding header but %zu bytes of payload remain"), + ioc->payload_remain); + return -1; + } + if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_HEAD_MIN_LEN + 4) { + /* header not complete */ + return QIO_CHANNEL_ERR_BLOCK; + } + + fin = (header->b0 & 0x80) >> 7; + opcode = header->b0 & 0x0f; + has_mask = (header->b1 & 0x80) >> 7; + payload_len = header->b1 & 0x7f; + + if (opcode == QIO_CHANNEL_WEBSOCK_OPCODE_CLOSE) { + /* disconnect */ + return 0; + } + + /* Websocket frame sanity check: + * * Websocket fragmentation is not supported. + * * All websockets frames sent by a client have to be masked. + * * Only binary encoding is supported. + */ + if (!fin) { + error_setg(errp, "%s", _("websocket fragmentation is not supported")); + return -1; + } + if (!has_mask) { + error_setg(errp, "%s", _("websocket frames must be masked")); + return -1; + } + if (opcode != QIO_CHANNEL_WEBSOCK_OPCODE_BINARY_FRAME) { + error_setg(errp, "%s", _("only binary websocket frames are supported")); + return -1; + } + + if (payload_len < 126) { + ioc->payload_remain = payload_len; + header_size = 6; + ioc->mask = header->u.m; + } else if (payload_len == 126 && ioc->encinput.offset >= 8) { + ioc->payload_remain = be16_to_cpu(header->u.s16.l16); + header_size = 8; + ioc->mask = header->u.s16.m16; + } else if (payload_len == 127 && ioc->encinput.offset >= 14) { + ioc->payload_remain = be64_to_cpu(header->u.s64.l64); + header_size = 14; + ioc->mask = header->u.s64.m64; + } else { + /* header not complete */ + return QIO_CHANNEL_ERR_BLOCK; + } + + qio_buffer_advance(&ioc->encinput, header_size); + return 1; +} + + +static ssize_t qio_channel_websock_decode_payload(QIOChannelWebsock *ioc, + Error **errp) +{ + size_t i; + size_t payload_len; + uint32_t *payload32; + + if (!ioc->payload_remain) { + error_setg(errp, "%s", + _("Decoding payload but no bytes of payload remain")); + return -1; + } + + /* If we aren't at the end of the payload, then drop + * off the last bytes, so we're always multiple of 4 + * for purpose of unmasking, except at end of payload + */ + if (ioc->encinput.offset < ioc->payload_remain) { + payload_len = ioc->encinput.offset - (ioc->encinput.offset % 4); + } else { + payload_len = ioc->payload_remain; + } + if (payload_len == 0) { + return QIO_CHANNEL_ERR_BLOCK; + } + + ioc->payload_remain -= payload_len; + + /* unmask frame */ + /* process 1 frame (32 bit op) */ + payload32 = (uint32_t *)ioc->encinput.buffer; + for (i = 0; i < payload_len / 4; i++) { + payload32[i] ^= ioc->mask.u; + } + /* process the remaining bytes (if any) */ + for (i *= 4; i < payload_len; i++) { + ioc->encinput.buffer[i] ^= ioc->mask.c[i % 4]; + } + + qio_buffer_reserve(&ioc->rawinput, payload_len); + qio_buffer_append(&ioc->rawinput, ioc->encinput.buffer, payload_len); + qio_buffer_advance(&ioc->encinput, payload_len); + return payload_len; +} + + +QIOChannelWebsock * +qio_channel_websock_new_server(QIOChannel *master) +{ + QIOChannelWebsock *wioc; + + wioc = QIO_CHANNEL_WEBSOCK(object_new(TYPE_QIO_CHANNEL_WEBSOCK)); + + wioc->master = master; + object_ref(OBJECT(master)); + + return wioc; +} + +void qio_channel_websock_handshake(QIOChannelWebsock *ioc, + QIOTaskFunc func, + gpointer opaque, + GDestroyNotify destroy) +{ + QIOTask *task; + + task = qio_task_new(OBJECT(ioc), + func, + opaque, + destroy); + + DPRINTF("Adding watch on master %p for websocket %p handshake\n", + ioc->master, ioc); + qio_channel_add_watch(ioc->master, + G_IO_IN, + qio_channel_websock_handshake_io, + task, + NULL); +} + +static void qio_channel_websock_init(Object *obj G_GNUC_UNUSED) +{ +} + + +static void qio_channel_websock_finalize(Object *obj) +{ + QIOChannelWebsock *ioc = QIO_CHANNEL_WEBSOCK(obj); + + qio_buffer_free(&ioc->encinput); + qio_buffer_free(&ioc->encoutput); + qio_buffer_free(&ioc->rawinput); + qio_buffer_free(&ioc->rawoutput); + object_unref(OBJECT(ioc->master)); + if (ioc->io_tag) { + g_source_remove(ioc->io_tag); + } + if (ioc->io_err) { + error_free(ioc->io_err); + } +} + + +static ssize_t qio_channel_websock_read_wire(QIOChannelWebsock *ioc, + Error **errp) +{ + ssize_t ret; + + DPRINTF("Have %zu bytes %p\n", ioc->encoutput.offset, ioc); + if (ioc->encinput.offset < 4096) { + size_t want = 4096 - ioc->encinput.offset; + + qio_buffer_reserve(&ioc->encinput, want); + ret = qio_channel_read(ioc->master, + (char *)ioc->encinput.buffer + + ioc->encinput.offset, + want, + errp); + if (ret < 0) { + return ret; + } + if (ret == 0 && + ioc->encinput.offset == 0) { + DPRINTF("EOF on wire & no more enc data availabl\n"); + return 0; + } + ioc->encinput.offset += ret; + DPRINTF("Now have %zu bytes enc input\n", ioc->encinput.offset); + } + + if (ioc->payload_remain == 0) { + DPRINTF("Looking to decode header\n"); + ret = qio_channel_websock_decode_header(ioc, errp); + if (ret < 0) { + return ret; + } + if (ret == 0) { + DPRINTF("EOF when decoding header\n"); + return 0; + } + } + DPRINTF("Looking to decode payload %zu\n", ioc->payload_remain); + + ret = qio_channel_websock_decode_payload(ioc, errp); + if (ret < 0) { + return ret; + } + DPRINTF("Now have %zu bytes raw input\n", ioc->rawinput.offset); + return ret; +} + + +static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *ioc, + Error **errp) +{ + ssize_t ret; + ssize_t done = 0; + qio_channel_websock_encode(ioc); + + DPRINTF("Writing %zu bytes %p\n", ioc->encoutput.offset, ioc); + while (ioc->encoutput.offset > 0) { + ret = qio_channel_write(ioc->master, + (char *)ioc->encoutput.buffer, + ioc->encoutput.offset, + errp); + if (ret < 0) { + if (ret == QIO_CHANNEL_ERR_BLOCK && + done > 0) { + DPRINTF("Blocking but wrote %zu\n", done); + return done; + } else { + DPRINTF("Error while writing %s\n", + error_get_pretty(*errp)); + return ret; + } + } + qio_buffer_advance(&ioc->encoutput, ret); + done += ret; + } + DPRINTF("Wrote %zu total\n", done); + return done; +} + + +static void qio_channel_websock_flush_free(gpointer user_data) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data); + object_unref(OBJECT(wioc)); +} + +static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc); + +static gboolean qio_channel_websock_flush(QIOChannel *ioc, + GIOCondition condition, + gpointer user_data) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(user_data); + ssize_t ret; + + DPRINTF("Websock master flush %p %d\n", ioc, condition); + if (condition & G_IO_OUT) { + ret = qio_channel_websock_write_wire(wioc, &wioc->io_err); + if (ret < 0) { + goto cleanup; + } + } + + if (condition & G_IO_IN) { + ret = qio_channel_websock_read_wire(wioc, &wioc->io_err); + if (ret < 0) { + goto cleanup; + } + if (ret == 0) { + DPRINTF("Got EOF when reading %p\n", wioc); + wioc->io_eof = TRUE; + } + } + + cleanup: + qio_channel_websock_set_watch(wioc); + return FALSE; +} + + +static void qio_channel_websock_unset_watch(QIOChannelWebsock *ioc) +{ + if (ioc->io_tag) { + DPRINTF("Removing old master watch %u %p\n", ioc->io_tag, ioc); + g_source_remove(ioc->io_tag); + ioc->io_tag = 0; + } +} + +static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc) +{ + GIOCondition cond = 0; + + qio_channel_websock_unset_watch(ioc); + + if (ioc->io_err) { + DPRINTF("Not adding master watch due to error %p %s\n", + ioc, error_get_pretty(ioc->io_err)); + return; + } + + if (ioc->encoutput.offset) { + cond |= G_IO_OUT; + } + if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER && + !ioc->io_eof) { + cond |= G_IO_IN; + } + + DPRINTF("Cond %d output=%zu input=%zu eof=%d\n", + cond, ioc->encoutput.offset, ioc->encinput.offset, ioc->io_eof); + if (cond) { + object_ref(OBJECT(ioc)); + ioc->io_tag = + qio_channel_add_watch(ioc->master, + cond, + qio_channel_websock_flush, + ioc, + qio_channel_websock_flush_free); + } +} + + +static ssize_t qio_channel_websock_readv(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int **fds, + size_t *nfds, + Error **errp) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); + size_t i; + ssize_t got = 0; + ssize_t ret; + DPRINTF("Read ioc %p %zu %p\n", iov, niov, ioc); + if (fds || nfds) { + error_setg(errp, "%s", + _("Cannot receive file descriptors over websocket channel")); + return -1; + } + + if (wioc->io_err) { + *errp = error_copy(wioc->io_err); + return -1; + } + + if (!wioc->rawinput.offset) { + ret = qio_channel_websock_read_wire(QIO_CHANNEL_WEBSOCK(ioc), errp); + if (ret < 0) { + return ret; + } + } + + for (i = 0 ; i < niov ; i++) { + size_t want = iov[i].iov_len; + if (want > (wioc->rawinput.offset - got)) { + want = (wioc->rawinput.offset - got); + } + + memcpy(iov[i].iov_base, + wioc->rawinput.buffer + got, + want); + got += want; + + if (want < iov[i].iov_len) { + break; + } + } + + qio_buffer_advance(&wioc->rawinput, got); + qio_channel_websock_set_watch(wioc); + DPRINTF("Returning %zu\n", got); + return got; +} + + +static ssize_t qio_channel_websock_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); + size_t i; + ssize_t done = 0; + ssize_t ret; + + if (fds || nfds) { + error_setg(errp, "%s", + _("Cannot send file descriptors over websocket channel")); + return -1; + } + + DPRINTF("Writev %p %zu %p err=%p\n", iov, niov, ioc, wioc->io_err); + if (wioc->io_err) { + *errp = error_copy(wioc->io_err); + return -1; + } + + if (wioc->io_eof) { + error_setg(errp, "%s", "Broken pipe"); + return -1; + } + + for (i = 0; i < niov; i++) { + size_t want = iov[i].iov_len; + if ((want + wioc->rawoutput.offset) > QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { + want = (QIO_CHANNEL_WEBSOCK_MAX_BUFFER - wioc->rawoutput.offset); + } + if (want == 0) { + goto done; + } + + qio_buffer_reserve(&wioc->rawoutput, want); + qio_buffer_append(&wioc->rawoutput, iov[i].iov_base, want); + done += want; + if (want < iov[i].iov_len) { + break; + } + } + + done: + ret = qio_channel_websock_write_wire(wioc, errp); + if (ret < 0 && + ret != QIO_CHANNEL_ERR_BLOCK) { + qio_channel_websock_unset_watch(wioc); + return -1; + } + + qio_channel_websock_set_watch(wioc); + + if (done == 0) { + return QIO_CHANNEL_ERR_BLOCK; + } + + return done; +} + +static void qio_channel_websock_set_blocking(QIOChannel *ioc, + bool enabled) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); + + qio_channel_set_blocking(wioc->master, enabled); +} + +static int qio_channel_websock_close(QIOChannel *ioc, + Error **errp) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); + + return qio_channel_close(wioc->master, errp); +} + +typedef struct QIOChannelWebsockSource QIOChannelWebsockSource; +struct QIOChannelWebsockSource { + GSource parent; + QIOChannelWebsock *wioc; + GIOCondition condition; +}; + +static gboolean +qio_channel_websock_source_prepare(GSource *source, + gint *timeout) +{ + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; + GIOCondition cond = 0; + *timeout = -1; + + if (wsource->wioc->rawinput.offset) { + cond |= G_IO_IN; + } + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { + cond |= G_IO_OUT; + } + +#if 0 + DPRINTF("Prep source %d cond %d input=%zu output=%zu\n", + wsource->condition, cond, + wsource->wioc->rawinput.offset, + wsource->wioc->rawoutput.offset); +#endif + + return cond & wsource->condition; +} + +static gboolean +qio_channel_websock_source_check(GSource *source) +{ + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; + GIOCondition cond = 0; + + if (wsource->wioc->rawinput.offset) { + cond |= G_IO_IN; + } + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { + cond |= G_IO_OUT; + } + + if (cond & wsource->condition) { + DPRINTF("Check source %d cond %d input=%zu output=%zu\n", + wsource->condition, cond, + wsource->wioc->rawinput.offset, + wsource->wioc->rawoutput.offset); + } + return cond & wsource->condition; +} + +static gboolean +qio_channel_websock_source_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + QIOChannelFunc func = (QIOChannelFunc)callback; + QIOChannelWebsockSource *wsource = (QIOChannelWebsockSource *)source; + GIOCondition cond = 0; + + if (wsource->wioc->rawinput.offset) { + cond |= G_IO_IN; + } + if (wsource->wioc->rawoutput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER) { + cond |= G_IO_OUT; + } + + DPRINTF("Disp source %d cond %d input=%zu output=%zu\n", + wsource->condition, cond, + wsource->wioc->rawinput.offset, + wsource->wioc->rawoutput.offset); + return (*func)(QIO_CHANNEL(wsource->wioc), + (cond & wsource->condition), + user_data); +} + +static void +qio_channel_websock_source_finalize(GSource *source) +{ + QIOChannelWebsockSource *ssource = (QIOChannelWebsockSource *)source; + + object_unref(OBJECT(ssource->wioc)); +} + +GSourceFuncs qio_channel_websock_source_funcs = { + qio_channel_websock_source_prepare, + qio_channel_websock_source_check, + qio_channel_websock_source_dispatch, + qio_channel_websock_source_finalize +}; + +static GSource *qio_channel_websock_create_watch(QIOChannel *ioc, + GIOCondition condition) +{ + QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc); + QIOChannelWebsockSource *ssource; + GSource *source; + + DPRINTF("Creating websock watch %p cond=%d\n", wioc, condition); + source = g_source_new(&qio_channel_websock_source_funcs, + sizeof(QIOChannelWebsockSource)); + g_source_set_name(source, "QIOChannelWebsock"); + ssource = (QIOChannelWebsockSource *)source; + + ssource->wioc = wioc; + object_ref(OBJECT(wioc)); + + ssource->condition = condition; + + qio_channel_websock_set_watch(wioc); + return source; +} + +static void qio_channel_websock_class_init(ObjectClass *klass, + void *class_data G_GNUC_UNUSED) +{ + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); + + ioc_klass->io_writev = qio_channel_websock_writev; + ioc_klass->io_readv = qio_channel_websock_readv; + ioc_klass->io_set_blocking = qio_channel_websock_set_blocking; + ioc_klass->io_close = qio_channel_websock_close; + ioc_klass->io_create_watch = qio_channel_websock_create_watch; +} + +static const TypeInfo qio_channel_websock_info = { + .parent = TYPE_QIO_CHANNEL, + .name = TYPE_QIO_CHANNEL_WEBSOCK, + .instance_size = sizeof(QIOChannelWebsock), + .instance_init = qio_channel_websock_init, + .instance_finalize = qio_channel_websock_finalize, + .class_init = qio_channel_websock_class_init, +}; + +static void qio_channel_websock_register_types(void) +{ + type_register_static(&qio_channel_websock_info); +} + +type_init(qio_channel_websock_register_types); -- 2.4.3