Rebased ref, commits from common ancestor: commit 4669e5980491bd65d149987ed408acd65def4871 Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Sat Apr 18 18:40:59 2020 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Sat Apr 18 18:41:51 2020 +0100
Proxy: marshal message serial too. Change-Id: I23a28fe052062a0b98bbb2828b71ab8de6f1459c diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js index 17fe03c9a..bf37b6cdf 100644 --- a/loleaflet/js/global.js +++ b/loleaflet/js/global.js @@ -208,6 +208,8 @@ this.id = window.proxySocketCounter++; this.sendCounter = 0; this.readWaiting = 0; + this.inSerial = 0; + this.outSerial = 0; this.onclose = function() { }; this.onerror = function() { @@ -231,17 +233,35 @@ console.debug('wrong data type: ' + type); break; } - if (arr[i+1] !== 48 && arr[i+2] !== 120) // '0x' + i++; + + // Serial + if (arr[i] !== 48 && arr[i+1] !== 120) // '0x' { console.debug('missing hex preamble'); break; } - i += 3; + i += 2; var numStr = ''; var start = i; while (arr[i] != 10) // '\n' i++; numStr = decoder.decode(arr.slice(start, i)); // FIXME: IE11 + var serial = parseInt(numStr, 16); + + i++; // skip \n + + // Size: + if (arr[i] !== 48 && arr[i+1] !== 120) // '0x' + { + console.debug('missing hex preamble'); + break; + } + i += 2; + start = i; + while (arr[i] != 10) // '\n' + i++; + numStr = decoder.decode(arr.slice(start, i)); // FIXME: IE11 var size = parseInt(numStr, 16); i++; // skip \n @@ -252,6 +272,10 @@ else data = arr.slice(i, i + size); + if (serial !== that.inSerial + 1) { + console.debug("Error: serial mismatch " + serial + " vs. " + (that.inSerial + 1)); + } + that.inSerial = serial; this.onmessage({ data: data }); i += size; // skip trailing '\n' in loop-increment @@ -295,7 +319,9 @@ }; this.send = function(msg) { this.sendQueue = this.sendQueue.concat( - 'B0x' + msg.length.toString(16) + '\n' + msg + '\n'); + 'B0x' + this.outSerial.toString(16) + '\n' + + '0x' + msg.length.toString(16) + '\n' + msg + '\n'); + this.outSerial++; if (this.sessionId !== 'fetchsession' && this.sendTimeout === undefined) this.sendTimeout = setTimeout(this.doSend, 2 /* ms */); }; diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp index c8a259abe..973d9f3c5 100644 --- a/wsd/ProxyProtocol.cpp +++ b/wsd/ProxyProtocol.cpp @@ -97,21 +97,37 @@ bool ProxyProtocolHandler::parseEmitIncoming( while (in.size() > 0) { - if (in[0] != 'T' && in[0] != 'B') + // Type + if ((in[0] != 'T' && in[0] != 'B') || in.size() < 2) { LOG_ERR("Invalid message type " << in[0]); return false; } auto it = in.begin() + 1; + + // Serial for (; it != in.end() && *it != '\n'; ++it); *it = '\0'; - uint64_t len = strtoll( &in[1], nullptr, 16 ); + uint64_t serial = strtoll( &in[1], nullptr, 16 ); + in.erase(in.begin(), it + 1); + if (in.size() < 2) + { + LOG_ERR("Invalid message framing size " << in.size()); + return false; + } + + // Length + it = in.begin(); + for (; it != in.end() && *it != '\n'; ++it); + *it = '\0'; + uint64_t len = strtoll( &in[0], nullptr, 16 ); in.erase(in.begin(), it + 1); if (len > in.size()) { LOG_ERR("Invalid message length " << len << " vs " << in.size()); return false; } + // far from efficient: std::vector<char> data; data.insert(data.begin(), in.begin(), in.begin() + len + 1); @@ -124,6 +140,9 @@ bool ProxyProtocolHandler::parseEmitIncoming( } in.erase(in.begin(), in.begin() + 1); + if (serial != _inSerial + 1) + LOG_ERR("Serial mismatch " << serial << " vs. " << (_inSerial + 1)); + _inSerial = serial; _msgHandler->handleMessage(data); } return true; @@ -180,7 +199,7 @@ void ProxyProtocolHandler::handleIncomingMessage(SocketDisposition &disposition) int ProxyProtocolHandler::sendMessage(const char *msg, const size_t len, bool text, bool flush) { - _writeQueue.push_back(std::make_shared<Message>(msg, len, text)); + _writeQueue.push_back(std::make_shared<Message>(msg, len, text, _outSerial++)); if (flush) { auto sock = popOutSocket(); diff --git a/wsd/ProxyProtocol.hpp b/wsd/ProxyProtocol.hpp index 692fd903c..30ef2dacf 100644 --- a/wsd/ProxyProtocol.hpp +++ b/wsd/ProxyProtocol.hpp @@ -16,12 +16,16 @@ * Implementation that builds a websocket like protocol from many * individual proxied HTTP requests back to back. * - * we use a trivial framing: <hex-length>\r\n<content>\r\n + * we use a trivial framing: [T(ext)|B(inary)]<hex-serial->\n<hex-length>\n<content>\n */ class ProxyProtocolHandler : public ProtocolHandlerInterface { public: - ProxyProtocolHandler() { } + ProxyProtocolHandler() : + _inSerial(0), + _outSerial(0) + { + } virtual ~ProxyProtocolHandler() { } @@ -67,12 +71,12 @@ private: struct Message : public std::vector<char> { - Message(const char *msg, const size_t len, bool text) + Message(const char *msg, const size_t len, bool text, uint64_t serial) { const char *type = text ? "T" : "B"; insert(end(), type, type + 1); std::ostringstream os; - os << std::hex << "0x" << len << "\n"; + os << std::hex << "0x" << serial << "\n" << "0x" << len << "\n"; std::string str = os.str(); insert(end(), str.c_str(), str.c_str() + str.size()); insert(end(), msg, msg + len); @@ -83,6 +87,8 @@ private: /// queue things when we have no socket to hand. std::vector<std::shared_ptr<Message>> _writeQueue; std::vector<std::weak_ptr<StreamSocket>> _outSockets; + uint64_t _inSerial; + uint64_t _outSerial; }; /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ commit 54970bf9608e8fa252779d051e34e53a98a3d32d Author: Michael Meeks <michael.me...@collabora.com> AuthorDate: Tue Apr 14 17:01:41 2020 +0100 Commit: Michael Meeks <michael.me...@collabora.com> CommitDate: Sat Apr 18 18:41:44 2020 +0100 Proxy: improve debugging and connection handling. Change-Id: I1d48c4ec7fb80eaab1aabc83b0c210b7cf138ef2 diff --git a/loleaflet/js/global.js b/loleaflet/js/global.js index 2268af83f..17fe03c9a 100644 --- a/loleaflet/js/global.js +++ b/loleaflet/js/global.js @@ -260,7 +260,6 @@ this.sendQueue = ''; this.sendTimeout = undefined; this.doSend = function () { - that.sendTimeout = undefined; console.debug('send msg "' + that.sendQueue + '"'); var req = new XMLHttpRequest(); req.open('POST', that.getEndPoint('write')); @@ -279,6 +278,7 @@ } req.send(that.sendQueue); that.sendQueue = ''; + that.sendTimeout = undefined; }; this.getSessionId = function() { var req = new XMLHttpRequest(); @@ -318,7 +318,7 @@ // horrors ... this.waitConnect = function() { console.debug('proxy: waiting - ' + that.readWaiting + ' on session ' + that.sessionId); - if (that.readWaiting > 4) // max 4 waiting connections concurrently. + if (that.readWaiting >= 4) // max 4 waiting connections concurrently. return; if (that.sessionId == 'fetchsession') return; // waiting for our session id. diff --git a/net/Socket.cpp b/net/Socket.cpp index 24041bc4b..0b7a2f2ff 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -219,7 +219,6 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS) timeout.tv_sec = timeoutMaxMicroS / (1000 * 1000); timeout.tv_nsec = (timeoutMaxMicroS % (1000 * 1000)) * 1000; rc = ::ppoll(&_pollFds[0], size + 1, &timeout, nullptr); - LOG_TRC("ppoll result " << rc << " errno " << strerror(errno)); # else int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000; LOG_TRC("Legacy Poll start, timeoutMs: " << timeoutMaxMs); diff --git a/wsd/ProxyProtocol.cpp b/wsd/ProxyProtocol.cpp index 3ed43e979..c8a259abe 100644 --- a/wsd/ProxyProtocol.cpp +++ b/wsd/ProxyProtocol.cpp @@ -219,6 +219,13 @@ void ProxyProtocolHandler::getIOStats(uint64_t &sent, uint64_t &recv) void ProxyProtocolHandler::dumpState(std::ostream& os) { os << "proxy protocol sockets: " << _outSockets.size() << " writeQueue: " << _writeQueue.size() << ":\n"; + os << "\t"; + for (auto &it : _outSockets) + { + auto sock = it.lock(); + os << "#" << (sock ? sock->getFD() : -2) << " "; + } + os << "\n"; for (auto it : _writeQueue) Util::dumpHex(os, "\twrite queue entry:", "\t\t", *it); if (_msgHandler) _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits