loolwsd/LOOLSession.cpp | 32 +++++--- loolwsd/LOOLSession.hpp | 11 +- loolwsd/LOOLWSD.cpp | 149 ++++++++++++++++++++++++++++++-------- loolwsd/LoadTest.cpp | 21 +++-- loolwsd/Makefile.am | 5 - loolwsd/configure.ac | 2 loolwsd/loolwsd-systemplate-setup | 2 loolwsd/loolwsd.spec.in | 8 +- loolwsd/protocol.txt | 8 +- loolwsd/tsqueue.h | 65 ++++++++++++++++ 10 files changed, 242 insertions(+), 61 deletions(-)
New commits: commit 2df1ddc08599d0f4236ea11f537f8317bdc6f3f4 Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 19:33:43 2015 +0300 Bump version after tarball diff --git a/loolwsd/configure.ac b/loolwsd/configure.ac index 0137082..17d2364 100644 --- a/loolwsd/configure.ac +++ b/loolwsd/configure.ac @@ -3,7 +3,7 @@ AC_PREREQ([2.69]) -AC_INIT([loolwsd], [1.0.36], [libreoff...@collabora.com]) +AC_INIT([loolwsd], [1.0.37], [libreoff...@collabora.com]) AM_INIT_AUTOMAKE([1.11 silent-rules]) commit fc8ff92ba9d59d3bcd82e2c16d8af99e21e0c435 Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 18:41:43 2015 +0300 Bump version for tarball diff --git a/loolwsd/configure.ac b/loolwsd/configure.ac index 395cec2..0137082 100644 --- a/loolwsd/configure.ac +++ b/loolwsd/configure.ac @@ -3,7 +3,7 @@ AC_PREREQ([2.69]) -AC_INIT([loolwsd], [1.0.35], [libreoff...@collabora.com]) +AC_INIT([loolwsd], [1.0.36], [libreoff...@collabora.com]) AM_INIT_AUTOMAKE([1.11 silent-rules]) commit 9be670309fd107d29c4c54c58d2927968d777745 Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 19:32:34 2015 +0300 Adapt to the CP LO 5.0 beta1 diff --git a/loolwsd/loolwsd.spec.in b/loolwsd/loolwsd.spec.in index f3a857a..b9c6ef2 100644 --- a/loolwsd/loolwsd.spec.in +++ b/loolwsd/loolwsd.spec.in @@ -25,10 +25,10 @@ Source0: loolwsd-@package_vers...@.tar.gz Source1: loolwsd.service Source2: sysconfig.loolwsd BuildRequires: libcap-progs libcap-devel libpng-devel poco-devel >= 1.6.0 systemd-rpm-macros -# This works for now only with the TDF nightly builds of 5.0 or 5.1, I -# think. But the TDF packages include the version number in their +# This works for now only with a CP build of 5.0, I think. The TDF +# (and thus also CP) packages include the version number in their # names. How clever is that? So we need to specify one. -Requires: libcap libcap-progs libpng libPocoFoundation30 >= 1.6.0 libPocoNet30 >= 1.6.0 libreofficedev5.1 libreofficedev5.1-en-US libreofficedev5.1-ure lodevbasis5.1-core lodevbasis5.1-writer lodevbasis5.1-impress lodevbasis5.1-graphicfilter lodevbasis5.1-en-US lodevbasis5.1-calc lodevbasis5.1-en-US-res lodevbasis5.1-en-US-calc lodevbasis5.1-ooofonts lodevbasis5.1-images lodevbasis5.1-filter-data lodevbasis5.1-draw lodevbasis5.1-base lodevbasis5.1-en-US-writer lodevbasis5.1-en-US-math lodevbasis5.1-en-US-base Mesa-libEGL1 Mesa-libGL1 Mesa-libglapi0 cups-libs dbus-1-glib fontconfig libbz2-1 libcairo2 libdrm2 libexpat1 libfreetype6 libgbm1 libgio-2_0-0 libglib-2_0-0 libgmodule-2_0-0 libgobject-2_0-0 libgthread-2_0-0 liblzma5 libpcre1 libpixman-1-0 libpng16-16 libuuid1 libxml2-2 %{?systemd_requires} %{fillup_prereq} +Requires: libcap libcap-progs libpng libPocoFoundation30 >= 1.6.0 libPocoNet30 >= 1.6.0 libreoffice5.0 libreoffice5.0-en-US libreoffice5.0-ure libobasis5.0-core libobasis5.0-writer libobasis5.0-impress libobasis5.0-graphicfilter libobasis5.0-en-US libobasis5.0-calc libobasis5.0-en-US-res libobasis5.0-en-US-calc libobasis5.0-ooofonts libobasis5.0-images libobasis5.0-filter-data libobasis5.0-draw libobasis5.0-base libobasis5.0-en-US-writer libobasis5.0-en-US-math libobasis5.0-en-US-base Mesa-libEGL1 Mesa-libGL1 Mesa-libglapi0 cups-libs dbus-1-glib fontconfig libbz2-1 libcairo2 libdrm2 libexpat1 libfreetype6 libgbm1 libgio-2_0-0 libglib-2_0-0 libgmodule-2_0-0 libgobject-2_0-0 libgthread-2_0-0 liblzma5 libpcre1 libpixman-1-0 libpng16-16 libuuid1 libxml2-2 %{?systemd_requires} %{fillup_prereq} %define owner lool %define group lool @@ -73,7 +73,7 @@ mkdir -p /var/cache/loolwsd && chmod og+w /var/cache/loolwsd # Figure out where LO is installed, let's hope it is not a mount point # Create a directory for loolwsd on the same file system -loroot=`rpm -ql libreofficedev5.1 | grep '/soffice$' | sed -e 's-/program/soffice--'` +loroot=`rpm -ql libreoffice5.0 | grep '/soffice$' | sed -e 's-/program/soffice--'` loolparent=`cd ${loroot} && cd .. && /bin/pwd` rm -rf ${loolparent}/lool commit 4c5c0d2bf1b1daf2d7063cacf6c5a16ed780a37f Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 19:29:17 2015 +0300 Don't use TABs for variable assignment lines diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am index 98ae42b..c3c5011 100644 --- a/loolwsd/Makefile.am +++ b/loolwsd/Makefile.am @@ -13,9 +13,9 @@ connect_SOURCES = Connect.cpp Util.cpp LOOLProtocol.cpp lokitclient_SOURCES = LOKitClient.cpp Util.cpp noinst_HEADERS = LOKitHelper.hpp LOOLProtocol.hpp LOOLSession.hpp LOOLWSD.hpp LoadTest.hpp TileCache.hpp Util.hpp \ - tsqueue.h \ - bundled/include/LibreOfficeKit/LibreOfficeKit.h bundled/include/LibreOfficeKit/LibreOfficeKitEnums.h \ - bundled/include/LibreOfficeKit/LibreOfficeKitInit.h bundled/include/LibreOfficeKit/LibreOfficeKitTypes.h + tsqueue.h \ + bundled/include/LibreOfficeKit/LibreOfficeKit.h bundled/include/LibreOfficeKit/LibreOfficeKitEnums.h \ + bundled/include/LibreOfficeKit/LibreOfficeKitInit.h bundled/include/LibreOfficeKit/LibreOfficeKitTypes.h clean-cache: # Intentionally don't use "*" below... Avoid risk of accidentally running rm -rf /* commit ee3e6e19ad07b338c0daeff83332c6526016ae83 Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 19:27:05 2015 +0300 Add tsqueue.h diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am index 06d9ce3..98ae42b 100644 --- a/loolwsd/Makefile.am +++ b/loolwsd/Makefile.am @@ -13,6 +13,7 @@ connect_SOURCES = Connect.cpp Util.cpp LOOLProtocol.cpp lokitclient_SOURCES = LOKitClient.cpp Util.cpp noinst_HEADERS = LOKitHelper.hpp LOOLProtocol.hpp LOOLSession.hpp LOOLWSD.hpp LoadTest.hpp TileCache.hpp Util.hpp \ + tsqueue.h \ bundled/include/LibreOfficeKit/LibreOfficeKit.h bundled/include/LibreOfficeKit/LibreOfficeKitEnums.h \ bundled/include/LibreOfficeKit/LibreOfficeKitInit.h bundled/include/LibreOfficeKit/LibreOfficeKitTypes.h commit eafd5dbc6682c4f02f8409d19ea9da66dcc20bac Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 18:22:09 2015 +0300 Don't bother with storing a ref to Application::instance() in a variable in some places diff --git a/loolwsd/LOOLSession.cpp b/loolwsd/LOOLSession.cpp index 9da6244..2b2d9bb 100644 --- a/loolwsd/LOOLSession.cpp +++ b/loolwsd/LOOLSession.cpp @@ -663,12 +663,10 @@ ChildProcessSession::~ChildProcessSession() bool ChildProcessSession::handleInput(const char *buffer, int length) { - Application& app = Application::instance(); - std::string firstLine = getFirstLine(buffer, length); StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - app.logger().information(Util::logPrefix() + "Input: " + getAbbreviatedMessage(buffer, length)); + Application::instance().logger().information(Util::logPrefix() + "Input: " + getAbbreviatedMessage(buffer, length)); if (tokens[0] == "load") { diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 7bbe69c..f654612 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -169,8 +169,6 @@ public: return; } - Application& app = Application::instance(); - tsqueue<std::string> queue; Thread queueHandlerThread; QueueHandler handler(queue); @@ -258,7 +256,7 @@ public: } catch (WebSocketException& exc) { - app.logger().error(Util::logPrefix() + "WebSocketException: " + exc.message()); + Application::instance().logger().error(Util::logPrefix() + "WebSocketException: " + exc.message()); switch (exc.code()) { case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: @@ -276,7 +274,7 @@ public: } catch (IOException& exc) { - app.logger().error(Util::logPrefix() + "IOException: " + exc.message()); + Application::instance().logger().error(Util::logPrefix() + "IOException: " + exc.message()); } queue.clear(); queue.put("eof"); @@ -293,7 +291,6 @@ public: HTTPRequestHandler* createRequestHandler(const HTTPServerRequest& request) override { - Application& app = Application::instance(); std::string line = (Util::logPrefix() + "Request from " + request.clientAddress().toString() + ": " + request.getMethod() + " " + @@ -305,7 +302,7 @@ public: line += " / " + it->first + ": " + it->second; } - app.logger().information(line); + Application::instance().logger().information(line); return new WebSocketRequestHandler(); } }; @@ -322,7 +319,6 @@ public: { int flags; int n; - Application& app = Application::instance(); _ws.setReceiveTimeout(0); try { @@ -343,7 +339,7 @@ public: } catch (WebSocketException& exc) { - app.logger().error(Util::logPrefix() + "WebSocketException: " + exc.message()); + Application::instance().logger().error(Util::logPrefix() + "WebSocketException: " + exc.message()); _ws.close(); } } diff --git a/loolwsd/LoadTest.cpp b/loolwsd/LoadTest.cpp index c03d581..b146ed2 100644 --- a/loolwsd/LoadTest.cpp +++ b/loolwsd/LoadTest.cpp @@ -86,7 +86,6 @@ public: int flags; int n; int tileCount = 0; - Application& app = Application::instance(); try { do @@ -135,7 +134,7 @@ public: } catch (WebSocketException& exc) { - app.logger().error("WebSocketException: " + exc.message()); + Application::instance().logger().error("WebSocketException: " + exc.message()); _ws.close(); } std::cout << Util::logPrefix() << "Got " << tileCount << " tiles" << std::endl; commit 55db4c4e0b5b5c208cfe39877433638b7e39b70f Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 18:04:46 2015 +0300 Use a queue also in the child processes diff --git a/loolwsd/LOOLSession.cpp b/loolwsd/LOOLSession.cpp index 159955b..9da6244 100644 --- a/loolwsd/LOOLSession.cpp +++ b/loolwsd/LOOLSession.cpp @@ -239,7 +239,8 @@ bool MasterProcessSession::handleInput(const char *buffer, int length) } return loadDocument(buffer, length, tokens); } - else if (tokens[0] != "invalidatetiles" && + else if (tokens[0] != "canceltiles" && + tokens[0] != "invalidatetiles" && tokens[0] != "key" && tokens[0] != "mouse" && tokens[0] != "resetselection" && @@ -258,6 +259,11 @@ bool MasterProcessSession::handleInput(const char *buffer, int length) sendTextFrame("error: cmd=" + tokens[0] + " kind=nodocloaded"); return false; } + else if (tokens[0] == "canceltiles") + { + if (!_peer.expired()) + forwardToPeer(buffer, length); + } else if (tokens[0] == "invalidatetiles") { return invalidateTiles(buffer, length, tokens); diff --git a/loolwsd/LOOLSession.hpp b/loolwsd/LOOLSession.hpp index 9742e5f..4e7e3dd 100644 --- a/loolwsd/LOOLSession.hpp +++ b/loolwsd/LOOLSession.hpp @@ -45,6 +45,8 @@ public: virtual bool getStatus(const char *buffer, int length) = 0; + virtual bool handleInput(const char *buffer, int length) = 0; + protected: LOOLSession(std::shared_ptr<Poco::Net::WebSocket> ws, Kind kind); virtual ~LOOLSession(); @@ -53,8 +55,6 @@ protected: const Kind _kind; - virtual bool handleInput(const char *buffer, int length) = 0; - void sendBinaryFrame(const char *buffer, int length); virtual bool loadDocument(const char *buffer, int length, Poco::StringTokenizer& tokens) = 0; diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index cdced5f..7bbe69c 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -121,15 +121,15 @@ using Poco::Util::Option; using Poco::Util::OptionSet; using Poco::Util::ServerApplication; -class FromClientQueueHandler: public Runnable +class QueueHandler: public Runnable { public: - FromClientQueueHandler(tsqueue<std::string>& queue): + QueueHandler(tsqueue<std::string>& queue): _queue(queue) { } - void setSession(std::shared_ptr<MasterProcessSession> session) + void setSession(std::shared_ptr<LOOLSession> session) { _session = session; } @@ -147,7 +147,7 @@ public: } private: - std::shared_ptr<MasterProcessSession> _session; + std::shared_ptr<LOOLSession> _session; tsqueue<std::string>& _queue; }; @@ -173,7 +173,7 @@ public: tsqueue<std::string> queue; Thread queueHandlerThread; - FromClientQueueHandler handler(queue); + QueueHandler handler(queue); try { @@ -209,11 +209,11 @@ public: char buffer[100000]; n = ws->receiveFrame(buffer, sizeof(buffer), flags); - std::string firstLine = getFirstLine(buffer, n); - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) { + std::string firstLine = getFirstLine(buffer, n); + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (kind == LOOLSession::Kind::ToClient && firstLine.size() == static_cast<std::string::size_type>(n)) { // Check if it is a "canceltiles" and in that case remove outstanding @@ -221,6 +221,9 @@ public: if (tokens.count() == 1 && tokens[0] == "canceltiles") { queue.remove_if([](std::string& x){ return x.find("tile ") == 0;}); + + // Also forward the "canceltiles" to the child process, if any + session->handleInput(buffer, n); } else { @@ -747,12 +750,19 @@ int LOOLWSD::childMain() HTTPResponse response; std::shared_ptr<WebSocket> ws(new WebSocket(cs, request, response)); - ChildProcessSession session(ws, loKit); + std::shared_ptr<ChildProcessSession> session(new ChildProcessSession(ws, loKit)); ws->setReceiveTimeout(0); std::string hello("child " + std::to_string(_childId)); - session.sendTextFrame(hello); + session->sendTextFrame(hello); + + tsqueue<std::string> queue; + Thread queueHandlerThread; + QueueHandler handler(queue); + + handler.setSession(session); + queueHandlerThread.start(handler); int flags; int n; @@ -762,10 +772,30 @@ int LOOLWSD::childMain() n = ws->receiveFrame(buffer, sizeof(buffer), flags); if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - if (!session.handleInput(buffer, n)) - n = 0; + { + std::string firstLine = getFirstLine(buffer, n); + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + + // The only kind of messages a child process receives are the single-line ones (?) + assert(firstLine.size() == static_cast<std::string::size_type>(n)); + + // Check if it is a "canceltiles" and in that case remove outstanding + // "tile" messages from the queue. + if (tokens.count() == 1 && tokens[0] == "canceltiles") + { + queue.remove_if([](std::string& x){ return x.find("tile ") == 0;}); + } + else + { + queue.put(firstLine); + } + } } while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); + + queue.clear(); + queue.put("eof"); + queueHandlerThread.join(); } catch (Exception& exc) { commit 33edf42291c08620534b7080ca10239d598d1f68 Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 17:13:13 2015 +0300 Log the number of tiles requests and replies diff --git a/loolwsd/LoadTest.cpp b/loolwsd/LoadTest.cpp index c0c2eed..c03d581 100644 --- a/loolwsd/LoadTest.cpp +++ b/loolwsd/LoadTest.cpp @@ -85,6 +85,7 @@ public: { int flags; int n; + int tileCount = 0; Application& app = Application::instance(); try { @@ -117,15 +118,17 @@ public: "Client got " << n << " bytes: " << getAbbreviatedMessage(largeBuffer, n) << std::endl; #endif - // We don't actually need to do anything with the buffer in this program. We - // only parse status: messages and they are not preceded by nextmessage: - // messages. + response = getFirstLine(buffer, n); } if (response.find("status:") == 0) { parseStatus(response, _type, _numParts, _currentPart, _width, _height); _cond.signal(); } + else if (response.find("tile:") == 0) + { + tileCount++; + } } } while (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE); @@ -135,6 +138,7 @@ public: app.logger().error("WebSocketException: " + exc.message()); _ws.close(); } + std::cout << Util::logPrefix() << "Got " << tileCount << " tiles" << std::endl; } WebSocket& _ws; @@ -224,6 +228,8 @@ private: std::uniform_int_distribution<> dis(0, 20); int extra = dis(_g); + int requestCount = 0; + // Exercise the server with this document for some minutes while (!documentStartTimestamp.isElapsed((20 + extra) * Timespan::SECONDS) && !clientDurationExceeded()) { @@ -236,6 +242,7 @@ private: "tileposy=" + std::to_string(y * DOCTILESIZE) + " " "tilewidth=" + std::to_string(DOCTILESIZE) + " " "tileheight=" + std::to_string(DOCTILESIZE)); + requestCount++; x = ((x + 1) % ((output._width-1)/DOCTILESIZE + 1)); if (x == 0) break; @@ -247,7 +254,7 @@ private: Thread::sleep(10000); - std::cout << Util::logPrefix() << "Shutting down client for '" << document << "'" << std::endl; + std::cout << Util::logPrefix() << "Sent " << requestCount << " tile requests, shutting down client for '" << document << "'" << std::endl; ws.shutdown(); thread.join(); commit c630a9c4c1075612d24d89dbf69ab5c536e9dd4a Author: Tor Lillqvist <t...@collabora.com> Date: Tue Jun 9 16:09:23 2015 +0300 Typo diff --git a/loolwsd/protocol.txt b/loolwsd/protocol.txt index 655dcba..74af6c3 100644 --- a/loolwsd/protocol.txt +++ b/loolwsd/protocol.txt @@ -14,7 +14,7 @@ client -> server canceltiles All outstanding tile messages from the client to the server are - dropped and will not be handled. There is no guarantee of exacely + dropped and will not be handled. There is no guarantee of exactly which tile: messages might still be sent back to the client. invalidatetiles part=<partNumber> tileposx=<xpos> tileposy=<ypos> tilewidth=<tileWidth> tileheight=<tileHeight> commit 88ac00e15b8c5c4c7a9cfcdbd453fd17a5f5cdcb Author: Tor Lillqvist <t...@collabora.com> Date: Fri Jun 5 16:12:06 2015 +0300 Add a "canceltiles" message to the protocol and handle it Implementing this Was harder than I first expected. The basic idea is as follows: The master process puts each message arriving from a client that isn't "canceltiles" into a (client-specific) queue. A separate thread that pulls messages from the queue at its own pace and handles them as before. Incoming "canceltiles" messages are handled specially, though: The queue is emptied of "tile" messages. The above sounds simple but there are several details that were a bit tricky to get right. diff --git a/loolwsd/LOOLSession.cpp b/loolwsd/LOOLSession.cpp index 5b30949..159955b 100644 --- a/loolwsd/LOOLSession.cpp +++ b/loolwsd/LOOLSession.cpp @@ -87,11 +87,21 @@ LOOLSession::~LOOLSession() void LOOLSession::sendTextFrame(const std::string& text) { + std::unique_lock<std::mutex> lock(_mutex); + _ws->sendFrame(text.data(), text.size()); } void LOOLSession::sendBinaryFrame(const char *buffer, int length) { + std::unique_lock<std::mutex> lock(_mutex); + + if (length > 1000) + { + std::string nextmessage = "nextmessage: size=" + std::to_string(length); + _ws->sendFrame(nextmessage.data(), nextmessage.size()); + } + _ws->sendFrame(buffer, length, WebSocket::FRAME_BINARY); } @@ -529,8 +539,6 @@ void MasterProcessSession::sendTile(const char *buffer, int length, StringTokeni cachedTile->read(output.data() + pos, size); cachedTile->close(); - sendTextFrame("nextmessage: size=" + std::to_string(output.size())); - sendBinaryFrame(output.data(), output.size()); return; @@ -628,7 +636,7 @@ void MasterProcessSession::forwardToPeer(const char *buffer, int length) auto peer = _peer.lock(); if (!peer) return; - peer->_ws->sendFrame(buffer, length, WebSocket::FRAME_BINARY); + peer->sendBinaryFrame(buffer, length); } ChildProcessSession::ChildProcessSession(std::shared_ptr<WebSocket> ws, LibreOfficeKit *loKit) : @@ -893,8 +901,6 @@ void ChildProcessSession::sendTile(const char *buffer, int length, StringTokeniz delete[] pixmap; - sendTextFrame("nextmessage: size=" + std::to_string(output.size())); - sendBinaryFrame(output.data(), output.size()); } diff --git a/loolwsd/LOOLSession.hpp b/loolwsd/LOOLSession.hpp index 8eef54e..9742e5f 100644 --- a/loolwsd/LOOLSession.hpp +++ b/loolwsd/LOOLSession.hpp @@ -69,6 +69,9 @@ protected: // The actual URL, also in the child, even if the child never accesses that. std::string _docURL; + +private: + std::mutex _mutex; }; template<typename charT, typename traits> diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 18c21b3..cdced5f 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -86,6 +86,7 @@ DEALINGS IN THE SOFTWARE. #include "LOOLProtocol.hpp" #include "LOOLSession.hpp" #include "LOOLWSD.hpp" +#include "tsqueue.h" #include "Util.hpp" using namespace LOOLProtocol; @@ -120,6 +121,36 @@ using Poco::Util::Option; using Poco::Util::OptionSet; using Poco::Util::ServerApplication; +class FromClientQueueHandler: public Runnable +{ +public: + FromClientQueueHandler(tsqueue<std::string>& queue): + _queue(queue) + { + } + + void setSession(std::shared_ptr<MasterProcessSession> session) + { + _session = session; + } + + void run() override + { + while (true) + { + std::string input = _queue.get(); + if (input == "eof") + break; + if (!_session->handleInput(input.c_str(), input.size())) + break; + } + } + +private: + std::shared_ptr<MasterProcessSession> _session; + tsqueue<std::string>& _queue; +}; + class WebSocketRequestHandler: public HTTPRequestHandler /// Handle a WebSocket connection. { @@ -139,26 +170,37 @@ public: } Application& app = Application::instance(); + + tsqueue<std::string> queue; + Thread queueHandlerThread; + FromClientQueueHandler handler(queue); + try { try { std::shared_ptr<WebSocket> ws(new WebSocket(request, response)); - std::shared_ptr<MasterProcessSession> session; + LOOLSession::Kind kind; if (request.getURI() == LOOLWSD::CHILD_URI && request.serverAddress().port() == LOOLWSD::MASTER_PORT_NUMBER) - { - session.reset(new MasterProcessSession(ws, LOOLSession::Kind::ToPrisoner)); - } + kind = LOOLSession::Kind::ToPrisoner; else + kind = LOOLSession::Kind::ToClient; + + std::shared_ptr<MasterProcessSession> session(new MasterProcessSession(ws, kind)); + + // For ToClient sessions, we store incoming messages in a queue and have a separate + // thread that handles them. This is so that we can empty the queue when we get a + // "canceltiles" message. + if (kind == LOOLSession::Kind::ToClient) { - session.reset(new MasterProcessSession(ws, LOOLSession::Kind::ToClient)); + handler.setSession(session); + queueHandlerThread.start(handler); } - // Loop, receiving WebSocket messages either from the - // client, or from the child process (to be forwarded to - // the client). + // Loop, receiving WebSocket messages either from the client, or from the child + // process (to be forwarded to the client). int flags; int n; ws->setReceiveTimeout(0); @@ -167,25 +209,43 @@ public: char buffer[100000]; n = ws->receiveFrame(buffer, sizeof(buffer), flags); + std::string firstLine = getFirstLine(buffer, n); + StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); + if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) { - if (!session->handleInput(buffer, n)) - n = 0; - } - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - { - std::string firstLine = getFirstLine(buffer, n); - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - - int size; - if (tokens.count() == 2 && tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0) + if (kind == LOOLSession::Kind::ToClient && firstLine.size() == static_cast<std::string::size_type>(n)) { - char largeBuffer[size]; - - n = ws->receiveFrame(largeBuffer, size, flags); - if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) + // Check if it is a "canceltiles" and in that case remove outstanding + // "tile" messages from the queue. + if (tokens.count() == 1 && tokens[0] == "canceltiles") + { + queue.remove_if([](std::string& x){ return x.find("tile ") == 0;}); + } + else + { + queue.put(firstLine); + } + } + else + { + // Check if it is a "nextmessage:" and in that case read the large + // follow-up message separately, and handle that only. + int size; + if (tokens.count() == 2 && tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0) + { + char largeBuffer[size]; + + n = ws->receiveFrame(largeBuffer, size, flags); + if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) + { + if (!session->handleInput(largeBuffer, n)) + n = 0; + } + } + else { - if (!session->handleInput(largeBuffer, n)) + if (!session->handleInput(buffer, n)) n = 0; } } @@ -215,6 +275,9 @@ public: { app.logger().error(Util::logPrefix() + "IOException: " + exc.message()); } + queue.clear(); + queue.put("eof"); + queueHandlerThread.join(); } }; diff --git a/loolwsd/LoadTest.cpp b/loolwsd/LoadTest.cpp index 7b7ca38..c0c2eed 100644 --- a/loolwsd/LoadTest.cpp +++ b/loolwsd/LoadTest.cpp @@ -241,8 +241,9 @@ private: break; } y = ((y + 1) % ((output._height-1)/DOCTILESIZE + 1)); - Thread::sleep(200); + Thread::sleep(50); } + sendTextFrame(ws, "canceltiles"); Thread::sleep(10000); diff --git a/loolwsd/protocol.txt b/loolwsd/protocol.txt index 82d3b90..655dcba 100644 --- a/loolwsd/protocol.txt +++ b/loolwsd/protocol.txt @@ -11,6 +11,12 @@ tiles proactively (guessing what the client might need). Etc. client -> server ================ +canceltiles + + All outstanding tile messages from the client to the server are + dropped and will not be handled. There is no guarantee of exacely + which tile: messages might still be sent back to the client. + invalidatetiles part=<partNumber> tileposx=<xpos> tileposy=<ypos> tilewidth=<tileWidth> tileheight=<tileHeight> All parameters are numbers. Makes the server remove any cached @@ -88,7 +94,7 @@ nextmessage: size=<byteSize> message). Can be ignored by clients using an API that can read arbitrarily large buffers from a WebSocket (like JavaScript), but must be handled by clients that cannot (like those using Poco - 1.6.0). + 1.6.0, like the "loadtest" program in the loolwsd sources). status: type=<typeName> parts=<numberOfParts> current=<currentPartNumber> width=<width> height=<height> diff --git a/loolwsd/tsqueue.h b/loolwsd/tsqueue.h new file mode 100644 index 0000000..9511efb --- /dev/null +++ b/loolwsd/tsqueue.h @@ -0,0 +1,65 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef INCLUDED_TSQUEUE_H +#define INCLUDED_TSQUEUE_H + +#include "config.h" + +#include <condition_variable> +#include <mutex> +#include <deque> + +// Thread-safe queue + +template <class T> +class tsqueue +{ +public: + void put(const T& value) + { + std::unique_lock<std::mutex> lock(_mutex); + _queue.push_back(value); + lock.unlock(); + _cv.notify_one(); + } + + T get() + { + std::unique_lock<std::mutex> lock(_mutex); + _cv.wait(lock, [this] { return _queue.size() > 0; }); + T result = _queue.front(); + _queue.pop_front(); + return result; + } + + void clear() + { + std::unique_lock<std::mutex> lock(_mutex); + while (_queue.size()) + _queue.pop_front(); + } + + template<class UnaryPredicate> + void remove_if(UnaryPredicate p) + { + std::unique_lock<std::mutex> lock(_mutex); + _queue.erase(std::remove_if(_queue.begin(), _queue.end(), p), + _queue.end()); + } + +private: + std::mutex _mutex; + std::condition_variable _cv; + std::deque<T> _queue; +}; + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ commit a5aebc75cf4feb25b7ceaf214d5717996efe5873 Author: Tor Lillqvist <t...@collabora.com> Date: Mon Jun 8 16:58:34 2015 +0300 Clarify usage message diff --git a/loolwsd/loolwsd-systemplate-setup b/loolwsd/loolwsd-systemplate-setup index 8af0517..7e310fb 100755 --- a/loolwsd/loolwsd-systemplate-setup +++ b/loolwsd/loolwsd-systemplate-setup @@ -1,6 +1,6 @@ #!/bin/bash -test $# -eq 2 || { echo "Usage: $0 <chroot template directory> <LO installation directory>"; exit 1; } +test $# -eq 2 || { echo "Usage: $0 <chroot template directory for system libs to create> <LO installation directory>"; exit 1; } # No provision for spaces or other weird characters in pathnames. So sue me. commit 903dd9bca36508daff2620e6754c0383c144f519 Author: Tor Lillqvist <t...@collabora.com> Date: Mon Jun 8 16:35:52 2015 +0300 The buffer parameter to handleInput() can be const diff --git a/loolwsd/LOOLSession.cpp b/loolwsd/LOOLSession.cpp index 9843832..5b30949 100644 --- a/loolwsd/LOOLSession.cpp +++ b/loolwsd/LOOLSession.cpp @@ -123,7 +123,7 @@ MasterProcessSession::~MasterProcessSession() } } -bool MasterProcessSession::handleInput(char *buffer, int length) +bool MasterProcessSession::handleInput(const char *buffer, int length) { Application::instance().logger().information(Util::logPrefix() + "Input: " + getAbbreviatedMessage(buffer, length)); @@ -647,7 +647,7 @@ ChildProcessSession::~ChildProcessSession() Util::shutdownWebSocket(*_ws); } -bool ChildProcessSession::handleInput(char *buffer, int length) +bool ChildProcessSession::handleInput(const char *buffer, int length) { Application& app = Application::instance(); diff --git a/loolwsd/LOOLSession.hpp b/loolwsd/LOOLSession.hpp index 7cac320..8eef54e 100644 --- a/loolwsd/LOOLSession.hpp +++ b/loolwsd/LOOLSession.hpp @@ -53,7 +53,7 @@ protected: const Kind _kind; - virtual bool handleInput(char *buffer, int length) = 0; + virtual bool handleInput(const char *buffer, int length) = 0; void sendBinaryFrame(const char *buffer, int length); @@ -94,7 +94,7 @@ public: MasterProcessSession(std::shared_ptr<Poco::Net::WebSocket> ws, Kind kind); virtual ~MasterProcessSession(); - virtual bool handleInput(char *buffer, int length) override; + virtual bool handleInput(const char *buffer, int length) override; bool haveSeparateProcess(); @@ -152,7 +152,7 @@ public: ChildProcessSession(std::shared_ptr<Poco::Net::WebSocket> ws, LibreOfficeKit *loKit); virtual ~ChildProcessSession(); - virtual bool handleInput(char *buffer, int length) override; + virtual bool handleInput(const char *buffer, int length) override; virtual bool getStatus(const char *buffer, int length); _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/libreoffice-commits