loolwsd/IoUtil.cpp | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++- loolwsd/IoUtil.hpp | 9 ++ loolwsd/LOOLWSD.cpp | 158 +++---------------------------------------------- loolwsd/Makefile.am | 2 4 files changed, 186 insertions(+), 148 deletions(-)
New commits: commit b25fe9d88a86bc247f1f3aedbbf7a51c5193d259 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Sun Mar 27 16:06:22 2016 -0400 loolwsd: moved SocketProcessor to IoUtil and generalized more Change-Id: I527e57d2430e21249cf8cd4867f22fdbbd092b09 Reviewed-on: https://gerrit.libreoffice.org/23637 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp index 7284eeb..cf46e7e 100644 --- a/loolwsd/IoUtil.cpp +++ b/loolwsd/IoUtil.cpp @@ -18,16 +18,179 @@ #include <sstream> #include <string> -#include <Poco/Exception.h> +#include <Poco/StringTokenizer.h> +#include <Poco/Net/HTTPServerResponse.h> #include <Poco/Net/WebSocket.h> +#include <Poco/Net/NetException.h> #include <Poco/Thread.h> #include "Common.hpp" +#include "LOOLProtocol.hpp" #include "IoUtil.hpp" #include "Util.hpp" namespace IoUtil { +using Poco::Net::WebSocket; +using Poco::Net::WebSocketException; + +// Synchronously process WebSocket requests and dispatch to handler. +// Handler returns false to end. +void SocketProcessor(std::shared_ptr<WebSocket> ws, + Poco::Net::HTTPServerResponse& response, + std::function<bool(const std::vector<char>&)> handler, + std::function<bool()> stopPredicate, + std::string name, + const size_t pollTimeoutMs) +{ + if (!name.empty()) + { + name = "[" + name + "] "; + } + + Log::info(name + "Starting Socket Processor."); + + // Timeout given is in microseconds. + const Poco::Timespan waitTime(pollTimeoutMs * 1000); + try + { + ws->setReceiveTimeout(0); + + int flags = 0; + int n = 0; + bool stop = false; + std::vector<char> payload(READ_BUFFER_SIZE * 100); + + for (;;) + { + stop = stopPredicate(); + if (stop) + { + Log::info(name + "Termination flagged. Finishing."); + break; + } + + if (!ws->poll(waitTime, Poco::Net::Socket::SELECT_READ)) + { + // Wait some more. + continue; + } + + payload.resize(payload.capacity()); + n = ws->receiveFrame(payload.data(), payload.capacity(), flags); + if (n >= 0) + { + payload.resize(n); + } + + if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) + { + // Echo back the ping payload as pong. + // Technically, we should send back a PONG control frame. + // However Firefox (probably) or Node.js (possibly) doesn't + // like that and closes the socket when we do. + // Echoing the payload as a normal frame works with Firefox. + ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/); + continue; + } + else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) + { + // In case we do send pings in the future. + continue; + } + else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)) + { + Log::warn(name + "Connection closed."); + break; + } + + assert(n > 0); + + const std::string firstLine = LOOLProtocol::getFirstLine(payload); + if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN) + { + // One WS message split into multiple frames. + while (true) + { + char buffer[READ_BUFFER_SIZE * 10]; + n = ws->receiveFrame(buffer, sizeof(buffer), flags); + if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) + { + break; + } + + payload.insert(payload.end(), buffer, buffer + n); + if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN) + { + // No more frames. + break; + } + } + } + else + { + int size = 0; + Poco::StringTokenizer tokens(firstLine, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM); + if (tokens.count() == 2 && + tokens[0] == "nextmessage:" && LOOLProtocol::getTokenInteger(tokens[1], "size", size) && size > 0) + { + // Check if it is a "nextmessage:" and in that case read the large + // follow-up message separately, and handle that only. + payload.resize(size); + + n = ws->receiveFrame(payload.data(), size, flags); + } + } + + if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) + { + Log::warn(name + "Connection closed."); + break; + } + + if (firstLine == "eof") + { + Log::info(name + "Received EOF. Finishing."); + break; + } + + // Call the handler. + if (!handler(payload)) + { + Log::info(name + "Socket handler flagged to finish."); + break; + } + } + + Log::debug() << name << "Finishing SocketProcessor. TerminationFlag: " << stop + << ", payload size: " << payload.size() + << ", flags: " << std::hex << flags << Log::end; + if (!payload.empty()) + { + Log::warn(name + "Last message will not be processed: [" + + LOOLProtocol::getAbbreviatedMessage(payload.data(), payload.size()) + "]."); + } + } + catch (const WebSocketException& exc) + { + Log::error("SocketProcessor: WebSocketException: " + exc.message()); + switch (exc.code()) + { + case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: + response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); + // fallthrough + case WebSocket::WS_ERR_NO_HANDSHAKE: + case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: + case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST); + response.setContentLength(0); + response.send(); + break; + } + } + + Log::info(name + "Finished Socket Processor."); +} void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws) { diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp index da3db34..1647e29 100644 --- a/loolwsd/IoUtil.hpp +++ b/loolwsd/IoUtil.hpp @@ -21,6 +21,15 @@ namespace IoUtil { + /// Synchronously process WebSocket requests and dispatch to handler. + //. Handler returns false to end. + void SocketProcessor(std::shared_ptr<Poco::Net::WebSocket> ws, + Poco::Net::HTTPServerResponse& response, + std::function<bool(const std::vector<char>&)> handler, + std::function<bool()> stopPredicate, + std::string name = std::string(), + const size_t pollTimeoutMs = POLL_TIMEOUT_MS); + /// Call WebSocket::shutdown() ignoring Poco::IOException. void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws); diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 5003a65..650b1e1 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -158,7 +158,6 @@ using Poco::TemporaryFile; using Poco::Thread; using Poco::ThreadLocal; using Poco::ThreadPool; -using Poco::Timespan; using Poco::URI; using Poco::Util::Application; using Poco::Util::HelpFormatter; @@ -218,147 +217,6 @@ public: } }; -// Synchronously process WebSocket requests and dispatch to handler. -// Handler returns false to end. -void SocketProcessor(std::shared_ptr<WebSocket> ws, - HTTPServerResponse& response, - std::function<bool(const std::vector<char>&)> handler) -{ - Log::info("Starting Socket Processor."); - - const Timespan waitTime(POLL_TIMEOUT_MS * 1000); - try - { - ws->setReceiveTimeout(0); - - int flags = 0; - int n = 0; - std::vector<char> payload(READ_BUFFER_SIZE * 100); - - while (!TerminationFlag && - (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE) - { - if (!ws->poll(waitTime, Socket::SELECT_READ)) - { - // Wait some more. - continue; - } - - payload.resize(payload.capacity()); - n = ws->receiveFrame(payload.data(), payload.capacity(), flags); - if (n >= 0) - { - payload.resize(n); - } - - if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING) - { - // Echo back the ping payload as pong. - // Technically, we should send back a PONG control frame. - // However Firefox (probably) or Node.js (possibly) doesn't - // like that and closes the socket when we do. - // Echoing the payload as a normal frame works with Firefox. - ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/); - } - else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG) - { - // In case we do send pings in the future. - } - else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)) - { - // Connection closed. - Log::warn() << "Received " << n - << " bytes. Connection closed. Flags: " - << std::hex << flags << Log::end; - break; - } - - assert(n > 0); - - const std::string firstLine = LOOLProtocol::getFirstLine(payload); - if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN) - { - // One WS message split into multiple frames. - while (true) - { - char buffer[READ_BUFFER_SIZE * 10]; - n = ws->receiveFrame(buffer, sizeof(buffer), flags); - if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) - { - break; - } - - payload.insert(payload.end(), buffer, buffer + n); - if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN) - { - // No more frames. - break; - } - } - } - else - { - int size = 0; - StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM); - if (tokens.count() == 2 && - tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0) - { - // Check if it is a "nextmessage:" and in that case read the large - // follow-up message separately, and handle that only. - payload.resize(size); - - n = ws->receiveFrame(payload.data(), size, flags); - } - } - - if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE) - { - break; - } - - if (firstLine == "eof") - { - Log::info("Received EOF. Finishing."); - break; - } - - // Call the handler. - if (!handler(payload)) - { - Log::info("Socket handler flagged for finishing."); - } - } - - Log::debug() << "Finishing SocketProcessor. TerminationFlag: " << TerminationFlag - << ", payload size: " << payload.size() - << ", flags: " << std::hex << flags << Log::end; - if (!payload.empty()) - { - Log::warn("Last message will not be processed: [" + getAbbreviatedMessage(payload.data(), payload.size()) + "]."); - } - } - catch (const WebSocketException& exc) - { - Log::error("SocketProcessor: WebSocketException: " + exc.message()); - switch (exc.code()) - { - case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION: - response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION); - // fallthrough - case WebSocket::WS_ERR_NO_HANDSHAKE: - case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION: - case WebSocket::WS_ERR_HANDSHAKE_NO_KEY: - response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST); - response.setContentLength(0); - response.send(); - break; - } - } - - Log::info("Finished Socket Processor."); -} - - /// Handle a public connection from a client. class ClientRequestHandler: public HTTPRequestHandler { @@ -615,7 +473,8 @@ private: queueHandlerThread.start(handler); bool normalShutdown = false; - SocketProcessor(ws, response, [&session, &queue, &normalShutdown](const std::vector<char>& payload) + IoUtil::SocketProcessor(ws, response, + [&session, &queue, &normalShutdown](const std::vector<char>& payload) { time(&session->_lastMessageTime); const auto token = LOOLProtocol::getFirstToken(payload); @@ -629,7 +488,10 @@ private: } return true; - }); + }, + []() { return TerminationFlag; }, + "Client_ws_" + id + ); if (docBroker->getSessionsCount() == 1 && !normalShutdown) { @@ -826,10 +688,14 @@ public: lock.unlock(); MasterProcessSession::AvailableChildSessionCV.notify_one(); - SocketProcessor(ws, response, [&session](const std::vector<char>& payload) + IoUtil::SocketProcessor(ws, response, + [&session](const std::vector<char>& payload) { return session->handleInput(payload.data(), payload.size()); - }); + }, + []() { return TerminationFlag; }, + "Child_ws_" + sessionId + ); } catch (const Exception& exc) { diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am index fe54269..458ea85 100644 --- a/loolwsd/Makefile.am +++ b/loolwsd/Makefile.am @@ -20,7 +20,7 @@ loadtest_SOURCES = LoadTest.cpp Util.cpp LOOLProtocol.cpp connect_SOURCES = Connect.cpp Util.cpp LOOLProtocol.cpp -lokitclient_SOURCES = LOKitClient.cpp IoUtil.cpp Util.cpp +lokitclient_SOURCES = LOKitClient.cpp LOOLProtocol.cpp IoUtil.cpp Util.cpp broker_shared_sources = ChildProcessSession.cpp $(shared_sources) _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits