loleaflet/js/global.js | 94 +++++++++++++++++++++-- net/Socket.hpp | 14 +++ wsd/DocumentBroker.hpp | 2 wsd/LOOLWSD.cpp | 2 wsd/ProxyProtocol.cpp | 193 ++++++++++++++++++++++++++++++++++++++++++++++++- wsd/ProxyProtocol.hpp | 81 +++++++++----------- 6 files changed, 328 insertions(+), 58 deletions(-)
New commits: commit fdc062b488afaeed5eaf061e279fff73623bccf3 Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Thu Mar 19 15:54:28 2020 +0000 Commit: Jan Holesovsky <ke...@collabora.com> CommitDate: Fri Apr 24 13:57:49 2020 +0200 Proxy protocol bits. For now very silly: [T|B] + hex length + \n + content + \n Change-Id: I256b834a23cca975a705da2c569887665ac6be02 Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92806 Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoff...@gmail.com> Reviewed-by: Jan Holesovsky <ke...@collabora.com> diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js index 477c66a5d..eba6852b8 100644 --- a/loleaflet/js/global.js +++ b/loleaflet/js/global.js @@ -1,4 +1,5 @@ /* -*- js-indent-level: 8 -*- */ +/* global Uint8Array */ (function (global) { var ua = navigator.userAgent.toLowerCase(), @@ -212,19 +213,97 @@ }; this.onmessage = function() { }; + this.parseIncomingArray = function(arr) { + var decoder = new TextDecoder(); + for (var i = 0; i < arr.length; ++i) + { + var left = arr.length - i; + if (left < 4) + { + console.debug('no data left'); + break; + } + var type = String.fromCharCode(arr[i+0]); + if (type != 'T' && type != 'B') + { + console.debug('wrong data type: ' + type); + break; + } + if (arr[i+1] !== 48 && arr[i+2] !== 120) // '0x' + { + console.debug('missing hex preamble'); + break; + } + i += 3; + var numStr = ''; + var start = i; + while (arr[i] != 10) // '\n' + i++; + numStr = decoder.decode(arr.slice(start, i)); // FIXME: IE11 + var size = parseInt(numStr, 16); + + i++; // skip \n + + var data; + if (type == 'T') // FIXME: IE11 + data = decoder.decode(arr.slice(i, i + size)); + else + data = arr.slice(i, i + size); + + this.onmessage({ data: data }); + + i += size; // skip trailing '\n' in loop-increment + } + }; + this.parseIncoming = function(type, msg) { + if (type === 'blob') + { + var fileReader = new FileReader(); + var that = this; + fileReader.onload = function(event) { + that.parseIncomingArray(event.target.result); + }; + fileReader.readAsArrayBuffer(msg); + } + else if (type === 'arraybuffer') + { + this.parseIncomingArray(new Uint8Array(msg)); + } + else if (type === 'text' || type === '') + { + const encoder = new TextEncoder(); + const arr = encoder.encode(msg); + this.parseIncomingArray(arr); + } + else + console.debug('Unknown encoding type: ' + type); + }; this.send = function(msg) { console.debug('send msg "' + msg + '"'); var req = new XMLHttpRequest(); req.open('POST', this.getEndPoint('write')); req.setRequestHeader('SessionId', this.sessionId); if (this.sessionId === 'fetchsession') + { + req.responseType = 'text'; req.addEventListener('load', function() { console.debug('got session: ' + this.responseText); that.sessionId = this.responseText; that.readyState = 1; that.onopen(); }); - req.send(msg); + } + else + { + req.responseType = 'arraybuffer'; + req.addEventListener('load', function() { + if (this.status == 200) + that.parseIncoming(this.responseType, this.response); + else + console.debug('Error on incoming response'); + }); + } + req.send('B0x' + msg.length.toString(16) + '\n' + msg + '\n'); }, this.close = function() { console.debug('close socket'); @@ -237,7 +316,6 @@ }; console.debug('New proxy socket ' + this.id + ' ' + this.uri); - // FIXME: perhaps a little risky. this.send('fetchsession'); var that = this; @@ -250,20 +328,16 @@ var req = new XMLHttpRequest(); // fetch session id: req.addEventListener('load', function() { - console.debug('read: ' + this.responseText); if (this.status == 200) - { - that.onmessage({ data: this.response }); - } + that.parseIncoming(this.responseType, this.response); else - { console.debug('Handle error ' + this.status); - } that.readWaiting = false; }); req.open('GET', that.getEndPoint('read')); - req.setRequestHeader('SessionId', this.sessionId); - req.send(that.sessionId); + req.setRequestHeader('SessionId', that.sessionId); + req.responseType = 'arraybuffer'; + req.send(''); that.readWaiting = true; }, 250); }; diff --git a/net/Socket.hpp b/net/Socket.hpp index 7ae042b86..2c94601ee 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -85,6 +85,10 @@ public: { _disposition = Type::CLOSED; } + std::shared_ptr<Socket> getSocket() const + { + return _socket; + } bool isMove() { return _disposition == Type::MOVE; } bool isClosed() { return _disposition == Type::CLOSED; } @@ -923,6 +927,12 @@ public: std::vector<std::pair<size_t, size_t>> _spans; }; + /// remove all queued input bytes + void clearInput() + { + _inBuffer.clear(); + } + /// Remove the first @count bytes from input buffer void eraseFirstInputBytes(const MessageMap &map) { @@ -1086,6 +1096,8 @@ public: /// Does it look like we have some TLS / SSL where we don't expect it ? bool sniffSSL() const; + void dumpState(std::ostream& os) override; + protected: /// Override to handle reading of socket data differently. virtual int readData(char* buf, int len) @@ -1109,8 +1121,6 @@ protected: #endif } - void dumpState(std::ostream& os) override; - void setShutdownSignalled() { _shutdownSignalled = true; diff --git a/wsd/DocumentBroker.hpp b/wsd/DocumentBroker.hpp index 7d7b5e67d..4e854d3fb 100644 --- a/wsd/DocumentBroker.hpp +++ b/wsd/DocumentBroker.hpp @@ -147,7 +147,7 @@ public: const Poco::URI& uriPublic, const bool isReadOnly, const std::string& hostNoTrust, - const std::shared_ptr<Socket> &moveSocket); + const std::shared_ptr<StreamSocket> &socket); /// Thread safe termination of this broker if it has a lingering thread void joinThread(); diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index 7aaa79e0d..c58430509 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -2938,7 +2938,7 @@ private: { docBroker->handleProxyRequest( sessionId, id, uriPublic, isReadOnly, - hostNoTrust, moveSocket); + hostNoTrust, streamSocket); return; } catch (const UnauthorizedRequestException& exc) diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp index 41043a57a..8aaff0131 100644 --- a/wsd/ProxyProtocol.cpp +++ b/wsd/ProxyProtocol.cpp @@ -25,7 +25,7 @@ void DocumentBroker::handleProxyRequest( const Poco::URI& uriPublic, const bool isReadOnly, const std::string& hostNoTrust, - const std::shared_ptr<Socket> &socket) + const std::shared_ptr<StreamSocket> &socket) { std::shared_ptr<ClientSession> clientSession; if (sessionId == "fetchsession") @@ -37,6 +37,22 @@ void DocumentBroker::handleProxyRequest( addSession(clientSession); LOOLWSD::checkDiskSpaceAndWarnClients(true); LOOLWSD::checkSessionLimitsAndWarnClients(); + + LOG_TRC("Returning id " << clientSession->getId()); + + std::ostringstream oss; + oss << "HTTP/1.1 200 OK\r\n" + "Last-Modified: " << Util::getHttpTimeNow() << "\r\n" + "User-Agent: " WOPI_AGENT_STRING "\r\n" + "Content-Length: " << clientSession->getId().size() << "\r\n" + "Content-Type: application/json\r\n" + "X-Content-Type-Options: nosniff\r\n" + "\r\n" + << clientSession->getId(); + + socket->send(oss.str()); + socket->shutdown(); + return; } else { @@ -69,13 +85,186 @@ void DocumentBroker::handleProxyRequest( proxy->handleRequest(uriPublic.toString(), socket); } +bool ProxyProtocolHandler::parseEmitIncoming( + const std::shared_ptr<StreamSocket> &socket) +{ + std::vector<char> &in = socket->getInBuffer(); + + std::stringstream oss; + socket->dumpState(oss); + LOG_TRC("Parse message:\n" << oss.str()); + + while (in.size() > 0) + { + if (in[0] != 'T' && in[0] != 'B') + { + LOG_ERR("Invalid message type " << in[0]); + return false; + } + auto it = in.begin() + 1; + for (; it != in.end() && *it != '\n'; ++it); + *it = '\0'; + uint64_t len = strtoll( &in[1], nullptr, 16 ); + in.erase(in.begin(), it + 1); + if (len > in.size()) + { + LOG_ERR("Invalid message length " << len << " vs " << in.size()); + return false; + } + // far from efficient: + std::vector<char> data; + data.insert(data.begin(), in.begin(), in.begin() + len + 1); + in.erase(in.begin(), in.begin() + len); + + if (in.size() < 1 || in[0] != '\n') + { + LOG_ERR("Missing final newline"); + return false; + } + in.erase(in.begin(), in.begin() + 1); + + _msgHandler->handleMessage(data); + } + return true; +} + void ProxyProtocolHandler::handleRequest(const std::string &uriPublic, const std::shared_ptr<Socket> &socket) { + auto streamSocket = std::static_pointer_cast<StreamSocket>(socket); + bool bRead = uriPublic.find("/write") == std::string::npos; LOG_INF("Proxy handle request " << uriPublic << " type: " << (bRead ? "read" : "write")); - (void)socket; + + if (bRead) + { + if (!_msgHandler) + LOG_WRN("unusual - incoming message with no-one to handle it"); + else if (!parseEmitIncoming(streamSocket)) + { + std::stringstream oss; + streamSocket->dumpState(oss); + LOG_ERR("bad socket structure " << oss.str()); + } + } + + if (!flushQueueTo(streamSocket) && !bRead) + { + // longer running 'write socket' + _writeSockets.push_back(streamSocket); + } + else + socket->shutdown(); +} + +void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition) +{ + std::stringstream oss; + disposition.getSocket()->dumpState(oss); + LOG_ERR("If you got here, it means we failed to parse this properly in handleRequest: " << oss.str()); +} + +int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush) +{ + _writeQueue.push_back(std::make_shared<Message>(msg, len, text)); + auto sock = popWriteSocket(); + if (sock && flush) + { + flushQueueTo(sock); + sock->shutdown(); + } + + return len; +} + +int ProxyProtocolHandler::sendTextMessage(const char *msg, const size_t len, bool flush) const +{ + LOG_TRC("ProxyHack - send text msg " + std::string(msg, len)); + return const_cast<ProxyProtocolHandler *>(this)->sendMessage(msg, len, true, flush); +} + +int ProxyProtocolHandler::sendBinaryMessage(const char *data, const size_t len, bool flush) const +{ + LOG_TRC("ProxyHack - send binary msg len " << len); + return const_cast<ProxyProtocolHandler *>(this)->sendMessage(data, len, false, flush); +} + +void ProxyProtocolHandler::shutdown(bool goingAway, const std::string &statusMessage) +{ + LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage); +} + +void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv) +{ + sent = recv = 0; +} + +void ProxyProtocolHandler::dumpState(std::ostream& os) +{ + os << "proxy protocol sockets: " << _writeSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n"; + for (auto it : _writeQueue) + Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it); +} + +void ProxyProtocolHandler::performWrites() +{ + if (_msgHandler) + _msgHandler->writeQueuedMessages(); + if (_writeQueue.size() <= 0) + return; + + auto sock = popWriteSocket(); + if (sock) + { + flushQueueTo(sock); + sock->shutdown(); + } +} + +bool ProxyProtocolHandler::flushQueueTo(const std::shared_ptr<StreamSocket> &socket) +{ + // slurp from the core to us. + if (_msgHandler && _msgHandler->hasQueuedMessages()) + _msgHandler->writeQueuedMessages(); + + size_t totalSize = 0; + for (auto it : _writeQueue) + totalSize += it->size(); + + if (!totalSize) + return false; + + std::ostringstream oss; + oss << "HTTP/1.1 200 OK\r\n" + "Last-Modified: " << Util::getHttpTimeNow() << "\r\n" + "User-Agent: " WOPI_AGENT_STRING "\r\n" + "Content-Length: " << totalSize << "\r\n" + "Content-Type: application/json\r\n" + "X-Content-Type-Options: nosniff\r\n" + "\r\n"; + socket->send(oss.str()); + + for (auto it : _writeQueue) + socket->send(it->data(), it->size(), false); + _writeQueue.clear(); + + return true; +} + +// LRU-ness ... +std::shared_ptr<StreamSocket> ProxyProtocolHandler::popWriteSocket() +{ + std::weak_ptr<StreamSocket> sock; + while (!_writeSockets.empty()) + { + sock = _writeSockets.front(); + _writeSockets.erase(_writeSockets.begin()); + auto realSock = sock.lock(); + if (realSock) + return realSock; + } + return std::shared_ptr<StreamSocket>(); } /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp index dd109c3a4..7548a7acd 100644 --- a/wsd/ProxyProtocol.hpp +++ b/wsd/ProxyProtocol.hpp @@ -9,19 +9,21 @@ #pragma once +#include <memory> #include <net/Socket.hpp> -/// Interface for building a websocket from this ... +/** + * Implementation that builds a websocket like protocol from many + * individual proxied HTTP requests back to back. + * + * we use a trivial framing: <hex-length>\r\n<content>\r\n + */ class ProxyProtocolHandler : public ProtocolHandlerInterface { public: - ProxyProtocolHandler() - { - } + ProxyProtocolHandler() { } - virtual ~ProxyProtocolHandler() - { - } + virtual ~ProxyProtocolHandler() { } /// Will be called exactly once by setHandler void onConnect(const std::shared_ptr<StreamSocket>& /* socket */) override @@ -29,10 +31,7 @@ public: } /// Called after successful socket reads. - void handleIncomingMessage(SocketDisposition &/* disposition */) override - { - assert("we get our data a different way" && false); - } + void handleIncomingMessage(SocketDisposition &/* disposition */) override; int getPollEvents(std::chrono::steady_clock::time_point /* now */, int64_t &/* timeoutMaxMs */) override @@ -45,9 +44,7 @@ public: { } - void performWrites() override - { - } + void performWrites() override; void onDisconnect() override { @@ -58,40 +55,40 @@ public: /// Clear all external references void dispose() override { _msgHandler.reset(); } - int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override - { - LOG_TRC("ProxyHack - send text msg " + std::string(msg, len)); - (void) flush; - return len; - } + int sendTextMessage(const char *msg, const size_t len, bool flush = false) const override; + int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override; + void shutdown(bool goingAway = false, const std::string &statusMessage = "") override; + void getIOStats(uint64_t &sent, uint64_t &recv) override; + void dumpState(std::ostream& os) override; - int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override - { - (void) data; (void) flush; - LOG_TRC("ProxyHack - send binary msg len " << len); - return len; - } - - void shutdown(bool goingAway = false, const std::string &statusMessage = "") override - { - LOG_TRC("ProxyHack - shutdown " << goingAway << ": " << statusMessage); - } - - void getIOStats(uint64_t &sent, uint64_t &recv) override - { - sent = recv = 0; - } - - void dumpState(std::ostream& os) override - { - os << "proxy protocol\n"; - } + bool parseEmitIncoming(const std::shared_ptr<StreamSocket> &socket); void handleRequest(const std::string &uriPublic, const std::shared_ptr<Socket> &socket); private: - std::vector<std::weak_ptr<StreamSocket>> _sockets; + std::shared_ptr<StreamSocket> popWriteSocket(); + int sendMessage(const char *msg, const size_t len, bool text, bool flush); + bool flushQueueTo(const std::shared_ptr<StreamSocket> &socket); + + struct Message : public std::vector<char> + { + Message(const char *msg, const size_t len, bool text) + { + const char *type = text ? "T" : "B"; + insert(end(), type, type + 1); + std::ostringstream os; + os << std::hex << "0x" << len << "\n"; + std::string str = os.str(); + insert(end(), str.c_str(), str.c_str() + str.size()); + insert(end(), msg, msg + len); + const char *terminator = "\n"; + insert(end(), terminator, terminator + 1); + } + }; + /// queue things when we have no socket to hand. + std::vector<std::shared_ptr<Message>> _writeQueue; + std::vector<std::weak_ptr<StreamSocket>> _writeSockets; }; /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits